Add serializer to kafka common

This commit is contained in:
k.struzhkin 2019-04-19 14:45:50 +03:00
parent 99c9368d50
commit 88edc61d3e
9 changed files with 324 additions and 0 deletions

78
.gitignore vendored Normal file
View File

@ -0,0 +1,78 @@
# Created by .ignore support plugin (hsz.mobi)
.eunit
deps
*.o
*.beam
*.plt
erl_crash.dump
ebin/*.beam
rel/example_project
.concrete/DEV_MODE
.rebar
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/
.idea/workspace.xml
.idea/tasks.xml
.idea/dictionaries
.idea/vcs.xml
.idea/jsLibraryMappings.xml
# Sensitive or high-churn files:
.idea/dataSources.ids
.idea/dataSources.xml
.idea/dataSources.local.xml
.idea/sqlDataSources.xml
.idea/dynamic.xml
.idea/uiDesigner.xml
# Gradle:
.idea/gradle.xml
.idea/libraries
# Mongo Explorer plugin:
.idea/mongoSettings.xml
*.iws
*.ipr
*.iml
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
*.class
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.ear
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
env.list

4
.gitmodules vendored Normal file
View File

@ -0,0 +1,4 @@
[submodule "build_utils"]
path = build_utils
url = git@github.com:rbkmoney/build_utils.git
branch = master

13
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,13 @@
#!groovy
build('kafka-common-lib', 'docker-host') {
checkoutRepo()
loadBuildUtils()
def javaLibPipeline
runStage('load JavaLib pipeline') {
javaLibPipeline = load("build_utils/jenkins_lib/pipeJavaLib.groovy")
}
def buildImageTag = "269686d735abef363f9f40a1bf4e1b7c751f3722"
javaLibPipeline(buildImageTag)
}

1
build_utils Submodule

@ -0,0 +1 @@
Subproject commit ea4aa042f482551d624fd49a570d28488f479e93

97
pom.xml Normal file
View File

@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>parent</artifactId>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<groupId>com.rbkmoney.kafka.common.lib</groupId>
<artifactId>kafka-common-lib</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description></description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>8</java.version>
<server.port>8022</server.port>
<lombok.version>1.18.4</lombok.version>
<kafka.clients.version>2.1.0</kafka.clients.version>
<damsel.version>1.278-3abdcb4</damsel.version>
<sonar.jacoco.reportPath>${project.basedir}/../target/jacoco.exec</sonar.jacoco.reportPath>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>1.1.15</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>damsel</artifactId>
<version>${damsel.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.2</version>
<configuration>
<destFile>${sonar.jacoco.reportPath}</destFile>
<append>true</append>
</configuration>
<executions>
<execution>
<id>agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,36 @@
package com.rbkmoney.deserializer;
import com.rbkmoney.exception.KafkaSerializationException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import java.util.Map;
@Slf4j
public abstract class AbstractDeserializerAdapter<T extends TBase> implements Deserializer<T> {
protected final ThreadLocal<TDeserializer> thriftDeserializer = ThreadLocal.withInitial(TDeserializer::new);
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
log.warn("ThriftSerializer configure configs: {} isKey: {} is do nothing!", isKey);
}
@Override
public void close() {
thriftDeserializer.remove();
}
protected T deserialize(byte[] data, T t) {
try {
thriftDeserializer.get().deserialize(t, data);
} catch (Exception e) {
log.error("Error when deserialize data: {} ", data, e);
throw new KafkaSerializationException(e);
}
return t;
}
}

View File

@ -0,0 +1,24 @@
package com.rbkmoney.exception;
public class KafkaSerializationException extends RuntimeException {
public KafkaSerializationException() {
}
public KafkaSerializationException(String message, Throwable cause) {
super(message, cause);
}
public KafkaSerializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public KafkaSerializationException(String message) {
super(message);
}
public KafkaSerializationException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,36 @@
package com.rbkmoney.serializer;
import com.rbkmoney.exception.KafkaSerializationException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.Map;
@Slf4j
public class ThriftSerializer<T extends TBase> implements Serializer<T> {
private final ThreadLocal<TSerializer> thriftSerializer = ThreadLocal.withInitial(TSerializer::new);
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
log.warn("ThriftSerializer configure configs: {} isKey: {} is do nothing!", isKey);
}
@Override
public byte[] serialize(String s, T event) {
try {
return thriftSerializer.get().serialize(event);
} catch (TException e) {
throw new KafkaSerializationException(e);
}
}
@Override
public void close() {
thriftSerializer.remove();
}
}

View File

@ -0,0 +1,35 @@
package com.rbkmoney.serializer;
import com.rbkmoney.damsel.base.Content;
import com.rbkmoney.deserializer.AbstractDeserializerAdapter;
import org.junit.Assert;
import org.junit.Test;
public class ThriftSerializerTest {
ThriftSerializer thriftSerializer = new ThriftSerializer();
private class AbstractDeserializerAdapterImpl extends AbstractDeserializerAdapter<Content> {
@Override
public Content deserialize(String s, byte[] bytes) {
Content content = new Content();
return super.deserialize(bytes, content);
}
}
@Test
public void serializeTest() {
Content content = new Content();
content.setType("type");
content.setData("data".getBytes());
byte[] bytes = thriftSerializer.serialize("poh", content);
AbstractDeserializerAdapterImpl abstractDeserializerAdapter = new AbstractDeserializerAdapterImpl();
Content pohContent = abstractDeserializerAdapter.deserialize("poh", bytes);
Assert.assertEquals(content, pohContent);
}
}