BJ-76: Added MsgPack Writer (#4)

This commit is contained in:
Vladimir Pankrashkin 2017-02-02 18:21:45 +03:00 committed by GitHub
parent a0a4fd0dba
commit 01b6b2bedb
4 changed files with 239 additions and 5 deletions

View File

@ -20,6 +20,12 @@
<artifactId>libthrift</artifactId>
<version>0.9.3-3</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.11</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -2,8 +2,10 @@ package com.rbkmoney.kebab;
import com.rbkmoney.kebab.serializer.TBaseSerializer;
import com.rbkmoney.kebab.writer.JsonStructWriter;
import com.rbkmoney.kebab.writer.MsgPackWriter;
import org.apache.thrift.TBase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
@ -24,7 +26,18 @@ public class Kebab<T extends TBase> {
}
public byte[] toMsgPack(T src) {
throw new UnsupportedOperationException("under contruction");
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
MsgPackWriter writer = new MsgPackWriter(os, true);
TBaseSerializer serializer = new TBaseSerializer();
serializer.write(writer, src);
writer.close();
return os.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

View File

@ -0,0 +1,175 @@
package com.rbkmoney.kebab.writer;
import com.rbkmoney.kebab.StructWriter;
import com.rbkmoney.kebab.exception.BadFormatException;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Deflater;
/**
* Created by vpankrashkin on 31.01.17.
*/
public class MsgPackWriter implements StructWriter {
private static final byte nop = 0;
private static final byte startStruct = 1;
private static final byte endStruct = 2;
private static final byte startList = 3;
private static final byte endList = 4;
private static final byte startMap = 5;
private static final byte endMap= 6;
private static final byte startMapKey= 7;
private static final byte endMapKey = 8;
private static final byte startMapValue = 9;
private static final byte endMapValue = 10;
private static final byte pointName = 11;
private static final byte pointValue = 12;
private final boolean autoClose;
private final MessagePacker msgPacker;
public MsgPackWriter(OutputStream stream, boolean autoClose) {
this.autoClose = autoClose;
this.msgPacker = MessagePack.newDefaultPacker(stream);
}
@Override
public void beginStruct() throws IOException {
msgPacker.packExtensionTypeHeader(startStruct, 0);
}
@Override
public void endStruct() throws IOException {
msgPacker.packExtensionTypeHeader(endStruct, 0);
}
/**
* @param size if >= 0, expected to be defined list size; if < 0, means stream mode with not fixed size
* */
@Override
public void beginList(int size) throws IOException {
if (size >= 0) {
msgPacker.packArrayHeader(size);
} else {
msgPacker.packExtensionTypeHeader(startList, 0);
}
}
@Override
public void endList() throws IOException {
msgPacker.packExtensionTypeHeader(endList, 0);
}
@Override
public void beginMap(int size) throws IOException {
if (size >= 0) {
msgPacker.packMapHeader(size);
} else {
msgPacker.packExtensionTypeHeader(startMap, 0);
}
}
@Override
public void endMap() throws IOException {
msgPacker.packExtensionTypeHeader(endMap, 0);
}
@Override
public void beginKey() throws IOException {
msgPacker.packExtensionTypeHeader(startMapKey, 0);
}
@Override
public void endKey() throws IOException {
msgPacker.packExtensionTypeHeader(endMapKey, 0);
}
@Override
public void beginValue() throws IOException {
msgPacker.packExtensionTypeHeader(startMapValue, 0);
}
@Override
public void endValue() throws IOException {
msgPacker.packExtensionTypeHeader(endMapValue, 0);
}
/**
* @param name - only ASCII symbols expected
* */
@Override
public void name(String name) throws IOException {
int length = name.length();
byte[] data = name.getBytes();
if (length != data.length) {
throw new BadFormatException("Only ASCII symbols're expected");
}
msgPacker.packRawStringHeader(length);
msgPacker.writePayload(data);
// Compress the bytes
byte[] output = new byte[100];
Deflater compresser = new Deflater();
compresser.setInput(data);
compresser.finish();
int compressedDataLength = compresser.deflate(output);
compresser.end();
}
@Override
public void value(boolean value) throws IOException {
msgPacker.packBoolean(value);
}
@Override
public void value(String value) throws IOException {
msgPacker.packString(value);
}
@Override
public void value(byte value) throws IOException {
msgPacker.packByte(value);
}
@Override
public void value(short value) throws IOException {
msgPacker.packShort(value);
}
@Override
public void value(int value) throws IOException {
msgPacker.packInt(value);
}
@Override
public void value(double value) throws IOException {
msgPacker.packDouble(value);
}
@Override
public void value(long value) throws IOException {
msgPacker.packLong(value);
}
@Override
public void value(byte[] value) throws IOException {
msgPacker.packBinaryHeader(value.length);
msgPacker.writePayload(value);
}
@Override
public void nullValue() throws IOException {
msgPacker.packNil();
}
@Override
public void close() throws IOException {
if (autoClose) {
msgPacker.close();
} else {
msgPacker.flush();
}
}
}

View File

@ -4,17 +4,55 @@ import com.rbkmoney.kebab.test.Fail;
import com.rbkmoney.kebab.test.Ids;
import com.rbkmoney.kebab.test.Status;
import com.rbkmoney.kebab.test.TestObject;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.util.*;
import java.util.zip.GZIPOutputStream;
/**
* Created by tolkonepiu on 25/01/2017.
*/
public class KebabTest {
Kebab kebab = new Kebab();
@Test
public void kebabTesting() {
TestObject testObject = getTestObject();
kebab.toJson(testObject);
}
@Test
public void msgPackTest() throws Exception {
TestObject testObject = getTestObject();
byte[] msgPack = kebab.toMsgPack(testObject);
byte[] tCompact = new TSerializer(new TCompactProtocol.Factory()).serialize(testObject);
byte[] tBinary = new TSerializer(new TBinaryProtocol.Factory()).serialize(testObject);
System.out.println("MsgPack:"+msgPack.length);
System.out.println("Compact:"+tCompact.length);
System.out.println("Binary:"+tBinary.length);
ByteArrayOutputStream bos1 = new ByteArrayOutputStream(msgPack.length);
GZIPOutputStream gzip1 = new GZIPOutputStream(bos1);
gzip1.write(msgPack);
gzip1.close();
System.out.println("GZip:MsgPack:" + bos1.toByteArray().length);
ByteArrayOutputStream bos2 = new ByteArrayOutputStream(tBinary.length);
GZIPOutputStream gzip2 = new GZIPOutputStream(bos2);
gzip2.write(tBinary);
gzip2.close();
System.out.println("GZip:Binary:" + bos2.toByteArray().length);
}
private TestObject getTestObject() {
TestObject testObject = new TestObject();
testObject.setDescription("kek");
testObject.setValue(2.32);
@ -36,17 +74,19 @@ public class KebabTest {
testObject.setFuck(Arrays.asList(suk, suk, suk));
Fail fail = new Fail();
fail.setReasons(new HashSet<>(Arrays.asList("kek1", "kek2")));
testObject.setStatus(Status.fail(new Fail(fail)));
Map<String, Integer> map = new HashMap<>();
map.put("kek1", 455);
map.put("kek2", 564);
map.put("kek3", 565);
map.put(null, null);
//map.put(null, 666);
//map.put("null", null);
testObject.setMaps(map);
Kebab kebab = new Kebab();
System.out.println(kebab.toJson(testObject));
testObject.setStatus(Status.fail(new Fail(fail)));
return testObject;
}
}