From 3f03c53c2e6ddb83dbc9fe783f94fb64208b2ae9 Mon Sep 17 00:00:00 2001 From: Pavel Popov Date: Mon, 19 Nov 2018 18:42:10 +0300 Subject: [PATCH] Initial implementation (#1) * Initial implementation * Add Jenkinsfile, fix arg types in client * Review fixes (part 1) * Review fixes (part 2) * Trying to build lib like a service * Build fixes * Trying with resource mapping * Use machinegun mock instead machinegun in container --- .gitignore | 3 + .gitmodules | 3 + Jenkinsfile | 16 +++ build_utils | 1 + pom.xml | 57 ++++++++ .../machinarium/client/AutomatonClient.java | 19 +++ .../client/TBaseAutomatonClient.java | 80 +++++++++++ .../machinarium/domain/CallResultData.java | 41 ++++++ .../machinarium/domain/SignalResultData.java | 33 +++++ .../machinarium/domain/TMachineEvent.java | 39 ++++++ .../MachineAlreadyExistsException.java | 23 ++++ .../MachineAlreadyWorkingException.java | 23 ++++ .../exception/MachineFailedException.java | 24 ++++ .../exception/MachineNotFoundException.java | 23 ++++ .../exception/NamespaceNotFoundException.java | 23 ++++ .../handler/AbstractProcessorHandler.java | 86 ++++++++++++ .../machinarium/util/MachineUtil.java | 28 ++++ .../rbkmoney/machinarium/AbstractTest.java | 63 +++++++++ .../machinarium/AutomatonMockSrvImpl.java | 127 ++++++++++++++++++ .../machinarium/MachinegunComplexTest.java | 108 +++++++++++++++ src/test/resources/log4j.properties | 5 + src/test/resources/machinegun/config.yaml | 34 +++++ 22 files changed, 859 insertions(+) create mode 100644 .gitignore create mode 100644 .gitmodules create mode 100644 Jenkinsfile create mode 160000 build_utils create mode 100644 pom.xml create mode 100644 src/main/java/com/rbkmoney/machinarium/client/AutomatonClient.java create mode 100644 src/main/java/com/rbkmoney/machinarium/client/TBaseAutomatonClient.java create mode 100644 src/main/java/com/rbkmoney/machinarium/domain/CallResultData.java create mode 100644 src/main/java/com/rbkmoney/machinarium/domain/SignalResultData.java create mode 100644 src/main/java/com/rbkmoney/machinarium/domain/TMachineEvent.java create mode 100644 src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyExistsException.java create mode 100644 src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyWorkingException.java create mode 100644 src/main/java/com/rbkmoney/machinarium/exception/MachineFailedException.java create mode 100644 src/main/java/com/rbkmoney/machinarium/exception/MachineNotFoundException.java create mode 100644 src/main/java/com/rbkmoney/machinarium/exception/NamespaceNotFoundException.java create mode 100644 src/main/java/com/rbkmoney/machinarium/handler/AbstractProcessorHandler.java create mode 100644 src/main/java/com/rbkmoney/machinarium/util/MachineUtil.java create mode 100644 src/test/java/com/rbkmoney/machinarium/AbstractTest.java create mode 100644 src/test/java/com/rbkmoney/machinarium/AutomatonMockSrvImpl.java create mode 100644 src/test/java/com/rbkmoney/machinarium/MachinegunComplexTest.java create mode 100644 src/test/resources/log4j.properties create mode 100644 src/test/resources/machinegun/config.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a6d2d8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/.idea +/target +*.iml \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..4a5266f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "build_utils"] + path = build_utils + url = git@github.com:rbkmoney/build_utils.git diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..f6b53cf --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,16 @@ +#!groovy +build('machinarium', 'java-maven') { + checkoutRepo() + loadBuildUtils() + + def mvnArgs = '-DjvmArgs="-Xmx256m"' + runStage('Maven package') { + withCredentials([[$class: 'FileBinding', credentialsId: 'java-maven-settings.xml', variable: 'SETTINGS_XML']]) { + if (env.BRANCH_NAME == 'master') { + sh 'mvn deploy --batch-mode --settings $SETTINGS_XML ' + "${mvnArgs}" + } else { + sh 'mvn package --batch-mode --settings $SETTINGS_XML ' + "${mvnArgs}" + } + } + } +} \ No newline at end of file diff --git a/build_utils b/build_utils new file mode 160000 index 0000000..9b66408 --- /dev/null +++ b/build_utils @@ -0,0 +1 @@ +Subproject commit 9b664082ddc8ec8cdfbe7513d54e433d24198cc2 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f514742 --- /dev/null +++ b/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + + com.rbkmoney + parent + 1.0.0 + + + com.rbkmoney + machinarium + 0.1.0 + + Machinegun java client/processor handler lib + + + + com.rbkmoney + machinegun-proto + 1.9-b71bfca + + + com.rbkmoney.woody + woody-thrift + 1.1.15 + + + com.rbkmoney.geck + serializer + 0.6.9 + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + test + + + org.eclipse.jetty + jetty-quickstart + 9.3.9.M1 + test + + + junit + junit + 4.11 + test + + + + + \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/machinarium/client/AutomatonClient.java b/src/main/java/com/rbkmoney/machinarium/client/AutomatonClient.java new file mode 100644 index 0000000..05f7ff6 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/client/AutomatonClient.java @@ -0,0 +1,19 @@ +package com.rbkmoney.machinarium.client; + +import com.rbkmoney.machinarium.domain.TMachineEvent; +import com.rbkmoney.machinarium.exception.*; +import com.rbkmoney.machinegun.stateproc.Machine; + +import java.util.List; + +public interface AutomatonClient { + + void start(String machineId, A args) throws MachineAlreadyExistsException, MachineFailedException, NamespaceNotFoundException; + + V call(String machineId, A args) throws NamespaceNotFoundException, MachineFailedException, MachineNotFoundException, MachineAlreadyWorkingException; + + Machine getMachine(String machineId) throws MachineNotFoundException, NamespaceNotFoundException; + + List> getEvents(String machineId) throws MachineNotFoundException, NamespaceNotFoundException; + +} diff --git a/src/main/java/com/rbkmoney/machinarium/client/TBaseAutomatonClient.java b/src/main/java/com/rbkmoney/machinarium/client/TBaseAutomatonClient.java new file mode 100644 index 0000000..df60088 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/client/TBaseAutomatonClient.java @@ -0,0 +1,80 @@ +package com.rbkmoney.machinarium.client; + +import com.rbkmoney.geck.serializer.Geck; +import com.rbkmoney.machinarium.domain.TMachineEvent; +import com.rbkmoney.machinarium.exception.*; +import com.rbkmoney.machinarium.util.MachineUtil; +import com.rbkmoney.machinegun.msgpack.Value; +import com.rbkmoney.machinegun.stateproc.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; + +import java.util.List; + +public class TBaseAutomatonClient implements AutomatonClient { + + private final AutomatonSrv.Iface client; + + private final String namespace; + + private final Class resultType; + + public TBaseAutomatonClient(AutomatonSrv.Iface client, String namespace, Class resultType) { + this.client = client; + this.namespace = namespace; + this.resultType = resultType; + } + + @Override + public void start(String machineId, A args) throws MachineAlreadyExistsException, MachineFailedException, NamespaceNotFoundException { + try { + client.start(namespace, machineId, Value.bin(Geck.toMsgPack(args))); + } catch (MachineFailed ex) { + throw new MachineFailedException(String.format("Machine failed, namespace='%s', machineId='%s', args='%s'", namespace, machineId, args), ex); + } catch (MachineAlreadyExists ex) { + throw new MachineAlreadyExistsException(String.format("Machine already exists, namespace='%s', machineId='%s'", namespace, machineId), ex); + } catch (NamespaceNotFound ex) { + throw new NamespaceNotFoundException(String.format("Namespace not found, namespace='%s'", namespace), ex); + } catch (TException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public V call(String machineId, A args) throws NamespaceNotFoundException, MachineFailedException, MachineNotFoundException, MachineAlreadyWorkingException { + try { + Value value = client.call(new MachineDescriptor(namespace, Reference.id(machineId), new HistoryRange()), Value.bin(Geck.toMsgPack(args))); + return Geck.msgPackToTBase(value.getBin(), resultType); + } catch (MachineFailed ex) { + throw new MachineFailedException(String.format("Machine failed, namespace='%s', machineId='%s', args='%s'", namespace, machineId, args), ex); + } catch (NamespaceNotFound ex) { + throw new NamespaceNotFoundException(String.format("Namespace not found, namespace='%s'", namespace), ex); + } catch (MachineNotFound ex) { + throw new MachineNotFoundException(String.format("Machine not found, namespace='%s', machineId='%s'", namespace, machineId), ex); + } catch (MachineAlreadyWorking ex) { + throw new MachineAlreadyWorkingException(String.format("Machine already working, namespace='%s', machineId='%s'", namespace, machineId), ex); + } catch (TException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public Machine getMachine(String machineId) throws MachineNotFoundException, NamespaceNotFoundException { + try { + return client.getMachine(new MachineDescriptor(namespace, Reference.id(machineId), new HistoryRange())); + } catch (MachineNotFound ex) { + throw new MachineNotFoundException(String.format("Machine not found, namespace='%s', machineId='%s'", namespace, machineId), ex); + } catch (NamespaceNotFound ex) { + throw new NamespaceNotFoundException(String.format("Namespace not found, namespace='%s'", namespace), ex); + } catch (TException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public List> getEvents(String machineId) throws MachineNotFoundException, NamespaceNotFoundException { + Machine machine = getMachine(machineId); + return MachineUtil.getMachineEvents(machine, resultType); + } + +} diff --git a/src/main/java/com/rbkmoney/machinarium/domain/CallResultData.java b/src/main/java/com/rbkmoney/machinarium/domain/CallResultData.java new file mode 100644 index 0000000..dd155d8 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/domain/CallResultData.java @@ -0,0 +1,41 @@ +package com.rbkmoney.machinarium.domain; + +import com.rbkmoney.machinegun.stateproc.ComplexAction; + +import java.util.List; + +public class CallResultData { + + private final T callResult; + + private final List newEvents; + + private final ComplexAction complexAction; + + public CallResultData(T callResult, List newEvents, ComplexAction complexAction) { + this.callResult = callResult; + this.newEvents = newEvents; + this.complexAction = complexAction; + } + + public T getCallResult() { + return callResult; + } + + public List getNewEvents() { + return newEvents; + } + + public ComplexAction getComplexAction() { + return complexAction; + } + + @Override + public String toString() { + return "CallResultData{" + + "callResult=" + callResult + + ", newEvents=" + newEvents + + ", complexAction=" + complexAction + + '}'; + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/domain/SignalResultData.java b/src/main/java/com/rbkmoney/machinarium/domain/SignalResultData.java new file mode 100644 index 0000000..b8e741c --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/domain/SignalResultData.java @@ -0,0 +1,33 @@ +package com.rbkmoney.machinarium.domain; + +import com.rbkmoney.machinegun.stateproc.ComplexAction; + +import java.util.List; + +public class SignalResultData { + + private final List newEvents; + + private final ComplexAction complexAction; + + public SignalResultData(List newEvents, ComplexAction complexAction) { + this.newEvents = newEvents; + this.complexAction = complexAction; + } + + public List getNewEvents() { + return newEvents; + } + + public ComplexAction getComplexAction() { + return complexAction; + } + + @Override + public String toString() { + return "SignalResultData{" + + "newEvents=" + newEvents + + ", complexAction=" + complexAction + + '}'; + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/domain/TMachineEvent.java b/src/main/java/com/rbkmoney/machinarium/domain/TMachineEvent.java new file mode 100644 index 0000000..a66aa9c --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/domain/TMachineEvent.java @@ -0,0 +1,39 @@ +package com.rbkmoney.machinarium.domain; + +import java.time.Instant; + +public class TMachineEvent { + + private final long id; + + private final Instant createdAt; + + private final T data; + + public TMachineEvent(long id, Instant createdAt, T data) { + this.id = id; + this.createdAt = createdAt; + this.data = data; + } + + public long getId() { + return id; + } + + public Instant getCreatedAt() { + return createdAt; + } + + public T getData() { + return data; + } + + @Override + public String toString() { + return "TEvent{" + + "id=" + id + + ", createdAt=" + createdAt + + ", data=" + data + + '}'; + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyExistsException.java b/src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyExistsException.java new file mode 100644 index 0000000..4a1a4ec --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyExistsException.java @@ -0,0 +1,23 @@ +package com.rbkmoney.machinarium.exception; + +public class MachineAlreadyExistsException extends RuntimeException { + + public MachineAlreadyExistsException() { + } + + public MachineAlreadyExistsException(String message) { + super(message); + } + + public MachineAlreadyExistsException(String message, Throwable cause) { + super(message, cause); + } + + public MachineAlreadyExistsException(Throwable cause) { + super(cause); + } + + public MachineAlreadyExistsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyWorkingException.java b/src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyWorkingException.java new file mode 100644 index 0000000..670d8a1 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/exception/MachineAlreadyWorkingException.java @@ -0,0 +1,23 @@ +package com.rbkmoney.machinarium.exception; + +public class MachineAlreadyWorkingException extends RuntimeException { + + public MachineAlreadyWorkingException() { + } + + public MachineAlreadyWorkingException(String message) { + super(message); + } + + public MachineAlreadyWorkingException(String message, Throwable cause) { + super(message, cause); + } + + public MachineAlreadyWorkingException(Throwable cause) { + super(cause); + } + + public MachineAlreadyWorkingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/exception/MachineFailedException.java b/src/main/java/com/rbkmoney/machinarium/exception/MachineFailedException.java new file mode 100644 index 0000000..ab067b5 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/exception/MachineFailedException.java @@ -0,0 +1,24 @@ +package com.rbkmoney.machinarium.exception; + +public class MachineFailedException extends RuntimeException { + + public MachineFailedException() { + } + + public MachineFailedException(String message) { + super(message); + } + + public MachineFailedException(String message, Throwable cause) { + super(message, cause); + } + + public MachineFailedException(Throwable cause) { + super(cause); + } + + public MachineFailedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/src/main/java/com/rbkmoney/machinarium/exception/MachineNotFoundException.java b/src/main/java/com/rbkmoney/machinarium/exception/MachineNotFoundException.java new file mode 100644 index 0000000..347b37d --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/exception/MachineNotFoundException.java @@ -0,0 +1,23 @@ +package com.rbkmoney.machinarium.exception; + +public class MachineNotFoundException extends RuntimeException { + + public MachineNotFoundException() { + } + + public MachineNotFoundException(String message) { + super(message); + } + + public MachineNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public MachineNotFoundException(Throwable cause) { + super(cause); + } + + public MachineNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/exception/NamespaceNotFoundException.java b/src/main/java/com/rbkmoney/machinarium/exception/NamespaceNotFoundException.java new file mode 100644 index 0000000..bbf7668 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/exception/NamespaceNotFoundException.java @@ -0,0 +1,23 @@ +package com.rbkmoney.machinarium.exception; + +public class NamespaceNotFoundException extends RuntimeException { + + public NamespaceNotFoundException() { + } + + public NamespaceNotFoundException(String message) { + super(message); + } + + public NamespaceNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public NamespaceNotFoundException(Throwable cause) { + super(cause); + } + + public NamespaceNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/com/rbkmoney/machinarium/handler/AbstractProcessorHandler.java b/src/main/java/com/rbkmoney/machinarium/handler/AbstractProcessorHandler.java new file mode 100644 index 0000000..77a3f40 --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/handler/AbstractProcessorHandler.java @@ -0,0 +1,86 @@ +package com.rbkmoney.machinarium.handler; + +import com.rbkmoney.geck.serializer.Geck; +import com.rbkmoney.machinarium.domain.CallResultData; +import com.rbkmoney.machinarium.domain.SignalResultData; +import com.rbkmoney.machinarium.domain.TMachineEvent; +import com.rbkmoney.machinarium.util.MachineUtil; +import com.rbkmoney.machinegun.msgpack.Nil; +import com.rbkmoney.machinegun.msgpack.Value; +import com.rbkmoney.machinegun.stateproc.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; + +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractProcessorHandler implements ProcessorSrv.Iface { + + private final Class argsType; + + private final Class resultType; + + public AbstractProcessorHandler(Class argsType, Class resultType) { + this.argsType = argsType; + this.resultType = resultType; + } + + @Override + public SignalResult processSignal(SignalArgs args) throws TException { + Signal._Fields signalType = args.getSignal().getSetField(); + Machine machine = args.getMachine(); + SignalResultData signalResult = processSignal(signalType, args, machine); + + return new SignalResult( + buildMachineStateChange(signalResult.getNewEvents()), + signalResult.getComplexAction() + ); + } + + @Override + public CallResult processCall(CallArgs args) throws TException { + Machine machine = args.getMachine(); + CallResultData callResult = processCall( + machine.getNs(), + machine.getId(), + Geck.msgPackToTBase(args.getArg().getBin(), argsType), + MachineUtil.getMachineEvents(machine, resultType) + ); + + return new CallResult( + Value.bin(Geck.toMsgPack(callResult.getCallResult())), + buildMachineStateChange(callResult.getNewEvents()), + callResult.getComplexAction() + ); + } + + private SignalResultData processSignal(Signal._Fields signalType, SignalArgs args, Machine machine) { + switch (signalType) { + case INIT: + InitSignal initSignal = args.getSignal().getInit(); + return processSignalInit(machine.getNs(), machine.getId(), Geck.msgPackToTBase(initSignal.getArg().getBin(), argsType)); + case TIMEOUT: + return processSignalTimeout(machine.getNs(), machine.getId(), MachineUtil.getMachineEvents(machine, resultType)); + default: + throw new UnsupportedOperationException(String.format("Unsupported signal type, signalType='%s'", signalType)); + } + } + + private MachineStateChange buildMachineStateChange(List newEvents) { + MachineStateChange machineStateChange = new MachineStateChange(); + machineStateChange.setAuxStateLegacy(Value.nl(new Nil())); //?? + machineStateChange.setEventsLegacy( + newEvents.stream() + .map(event -> Value.bin(Geck.toMsgPack(event))) + .collect(Collectors.toList()) + ); + return machineStateChange; + } + + protected abstract SignalResultData processSignalInit(String namespace, String machineId, A args); + + protected abstract SignalResultData processSignalTimeout(String namespace, String machineId, List> events); + + protected abstract CallResultData processCall(String namespace, String machineId, A args, List> events); + +} diff --git a/src/main/java/com/rbkmoney/machinarium/util/MachineUtil.java b/src/main/java/com/rbkmoney/machinarium/util/MachineUtil.java new file mode 100644 index 0000000..fb0fe9f --- /dev/null +++ b/src/main/java/com/rbkmoney/machinarium/util/MachineUtil.java @@ -0,0 +1,28 @@ +package com.rbkmoney.machinarium.util; + +import com.rbkmoney.geck.serializer.Geck; +import com.rbkmoney.machinarium.domain.TMachineEvent; +import com.rbkmoney.machinegun.stateproc.Event; +import com.rbkmoney.machinegun.stateproc.Machine; +import org.apache.thrift.TBase; + +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +public class MachineUtil { + + public static List> getMachineEvents(Machine machine, Class eventType) { + return machine.getHistory().stream() + .sorted(Comparator.comparingLong(Event::getId)) + .map( + event -> new TMachineEvent<>( + event.getId(), + Instant.parse(event.getCreatedAt()), + Geck.msgPackToTBase(event.getData().getBin(), eventType) + ) + ).collect(Collectors.toList()); + } + +} diff --git a/src/test/java/com/rbkmoney/machinarium/AbstractTest.java b/src/test/java/com/rbkmoney/machinarium/AbstractTest.java new file mode 100644 index 0000000..3c8bc3e --- /dev/null +++ b/src/test/java/com/rbkmoney/machinarium/AbstractTest.java @@ -0,0 +1,63 @@ +package com.rbkmoney.machinarium; + +import com.rbkmoney.woody.thrift.impl.http.THServiceBuilder; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; + +import javax.servlet.Servlet; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +public class AbstractTest { + + private HandlerCollection handlerCollection; + private Server server; + + protected int serverPort = 8080; + + protected void addServlets(Map servlets) { + try { + ServletContextHandler context = new ServletContextHandler(); + for (Map.Entry entry : servlets.entrySet()) { + ServletHolder defaultServ = new ServletHolder(entry.getKey(), entry.getValue()); + context.addServlet(defaultServ, entry.getKey()); + } + handlerCollection.addHandler(context); + context.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected Servlet createThriftRPCService(Class iface, T handler) { + THServiceBuilder serviceBuilder = new THServiceBuilder(); + return serviceBuilder.build(iface, handler); + } + + protected URI buildURI(String path) throws URISyntaxException { + return new URI("http://localhost:" + serverPort + path); + } + + protected void startServer() throws Exception { + server = new Server(serverPort); + HandlerCollection contextHandlerCollection = new HandlerCollection(true); + this.handlerCollection = contextHandlerCollection; + server.setHandler(contextHandlerCollection); + + server.start(); + } + + @After + public void stopServer() { + try { + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/src/test/java/com/rbkmoney/machinarium/AutomatonMockSrvImpl.java b/src/test/java/com/rbkmoney/machinarium/AutomatonMockSrvImpl.java new file mode 100644 index 0000000..14fc351 --- /dev/null +++ b/src/test/java/com/rbkmoney/machinarium/AutomatonMockSrvImpl.java @@ -0,0 +1,127 @@ +package com.rbkmoney.machinarium; + +import com.rbkmoney.machinegun.msgpack.Value; +import com.rbkmoney.machinegun.stateproc.*; +import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder; +import org.apache.thrift.TException; + +import java.net.URI; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class AutomatonMockSrvImpl implements AutomatonSrv.Iface { + + private final String namespace = "machinarium"; + + private Map machines = new HashMap<>(); + + private AtomicLong eventIdCounter = new AtomicLong(1); + + private ProcessorSrv.Iface client; + + public AutomatonMockSrvImpl(URI uri) { + client = new THSpawnClientBuilder() + .withAddress(uri) + .build(ProcessorSrv.Iface.class); + } + + @Override + public void start(String ns, String id, Value args) throws NamespaceNotFound, MachineAlreadyExists, MachineFailed, TException { + if (!namespace.equals(ns)) { + throw new NamespaceNotFound(); + } + + if (machines.containsKey(id)) { + throw new MachineAlreadyExists(); + } + + Machine machine = new Machine(ns, id, new ArrayList<>(), new HistoryRange()); + SignalResult signalResult = client.processSignal(new SignalArgs(Signal.init(new InitSignal(args)), machine)); + machine.setHistory( + signalResult.getChange().getEventsLegacy() + .stream().map( + event -> new Event( + eventIdCounter.getAndIncrement(), + Instant.now().toString(), + event + ) + ).collect(Collectors.toList()) + ); + machines.put(id, machine); + } + + @Override + public void repair(MachineDescriptor desc, Value a) throws NamespaceNotFound, MachineNotFound, MachineFailed, MachineAlreadyWorking, TException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void simpleRepair(String ns, Reference ref) throws NamespaceNotFound, MachineNotFound, MachineFailed, MachineAlreadyWorking, TException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public Value call(MachineDescriptor desc, Value args) throws NamespaceNotFound, MachineNotFound, MachineFailed, TException { + if (!namespace.equals(desc.getNs())) { + throw new NamespaceNotFound(); + } + + if (desc.getRef().isSetTag()) { + throw new UnsupportedOperationException("Not implemented"); + } + + if (!machines.containsKey(desc.getRef().getId())) { + throw new MachineNotFound(); + } + + Machine machine = machines.get(desc.getRef().getId()); + CallResult callResult = client.processCall(new CallArgs(args, machine)); + List events = machine.getHistory(); + events.addAll( + callResult.getChange().getEventsLegacy() + .stream().map( + event -> new Event( + eventIdCounter.getAndIncrement(), + Instant.now().toString(), + event + ) + ).collect(Collectors.toList()) + ); + machine.setHistory(events); + machines.put(desc.getRef().getId(), machine); + + return callResult.getResponse(); + } + + @Override + public Machine getMachine(MachineDescriptor desc) throws NamespaceNotFound, MachineNotFound, EventNotFound, TException { + if (!namespace.equals(desc.getNs())) { + throw new NamespaceNotFound(); + } + + if (desc.getRef().isSetTag()) { + throw new UnsupportedOperationException("Not implemented"); + } + + if (!machines.containsKey(desc.getRef().getId())) { + throw new MachineNotFound(); + } + + return machines.get(desc.getRef().getId()); + } + + @Override + public void remove(String ns, String id) throws NamespaceNotFound, MachineNotFound, TException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void modernize(MachineDescriptor desc) throws NamespaceNotFound, MachineNotFound, TException { + throw new UnsupportedOperationException("Not implemented"); + } +} diff --git a/src/test/java/com/rbkmoney/machinarium/MachinegunComplexTest.java b/src/test/java/com/rbkmoney/machinarium/MachinegunComplexTest.java new file mode 100644 index 0000000..87ee5bf --- /dev/null +++ b/src/test/java/com/rbkmoney/machinarium/MachinegunComplexTest.java @@ -0,0 +1,108 @@ +package com.rbkmoney.machinarium; + +import com.rbkmoney.machinarium.client.AutomatonClient; +import com.rbkmoney.machinarium.client.TBaseAutomatonClient; +import com.rbkmoney.machinarium.domain.CallResultData; +import com.rbkmoney.machinarium.domain.SignalResultData; +import com.rbkmoney.machinarium.domain.TMachineEvent; +import com.rbkmoney.machinarium.exception.MachineAlreadyExistsException; +import com.rbkmoney.machinarium.exception.NamespaceNotFoundException; +import com.rbkmoney.machinarium.handler.AbstractProcessorHandler; +import com.rbkmoney.machinegun.msgpack.Value; +import com.rbkmoney.machinegun.stateproc.AutomatonSrv; +import com.rbkmoney.machinegun.stateproc.ComplexAction; +import com.rbkmoney.machinegun.stateproc.ProcessorSrv; +import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder; +import org.junit.Before; +import org.junit.Test; + +import javax.servlet.Servlet; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.fail; + +public class MachinegunComplexTest extends AbstractTest { + + private final String automatonPath = "/v1/automaton"; + private final String processorPath = "/v1/processor"; + + private AutomatonClient aClient; + private AutomatonSrv.Iface thriftClient; + + + private Servlet processorServlet = createThriftRPCService(ProcessorSrv.Iface.class, new AbstractProcessorHandler(Value.class, Value.class) { + + @Override + protected SignalResultData processSignalInit(String namespace, String machineId, Value args) { + return new SignalResultData(Arrays.asList(args), new ComplexAction()); + } + + @Override + protected SignalResultData processSignalTimeout(String namespace, String machineId, List> tMachineEvents) { + return new SignalResultData(Arrays.asList(Value.str("timeout")), new ComplexAction()); + } + + @Override + protected CallResultData processCall(String namespace, String machineId, Value args, List> tMachineEvents) { + return new CallResultData(args, Arrays.asList(args), new ComplexAction()); + } + }); + + @Before + public void setup() throws Exception { + startServer(); + + Servlet automatonServlet = createThriftRPCService(AutomatonSrv.Iface.class, new AutomatonMockSrvImpl(buildURI(processorPath))); + Map servlets = new HashMap<>(); + servlets.put(automatonPath, automatonServlet); + servlets.put(processorPath, processorServlet); + addServlets(servlets); + + thriftClient = new THSpawnClientBuilder() + .withAddress(buildURI(automatonPath)) + .withNetworkTimeout(0) + .build(AutomatonSrv.Iface.class); + aClient = new TBaseAutomatonClient<>(thriftClient, "machinarium", Value.class); + } + + @Test(expected = NamespaceNotFoundException.class) + public void testNamespaceNotFound() { + new TBaseAutomatonClient<>(thriftClient, "not_found", Value.class) + .start("kek", Value.b(true)); + } + + @Test + public void testStartMachine() { + String machineId = "start_test"; + aClient.start(machineId, Value.b(true)); + try { + aClient.start(machineId, Value.b(false)); + fail(); + } catch (MachineAlreadyExistsException ex) { + + } + List> events = aClient.getEvents(machineId); + assertEquals(1, events.size()); + assertEquals(Value.b(true), events.get(0).getData()); + } + + @Test + public void testStartAndCallMachine() { + String machineId = "call_test"; + aClient.start(machineId, Value.b(true)); + Value value = aClient.call(machineId, Value.b(false)); + assertEquals(Value.b(false), value); + List> events = aClient.getEvents(machineId); + assertEquals(2, events.size()); + assertEquals(Value.b(true), events.get(0).getData()); + assertEquals(Value.b(false), events.get(1).getData()); + } + + +} diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..24c3787 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%t] %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p Span[%X{trace_id}-%X{span_id}-%X{parent_id}] - %m%n \ No newline at end of file diff --git a/src/test/resources/machinegun/config.yaml b/src/test/resources/machinegun/config.yaml new file mode 100644 index 0000000..8722434 --- /dev/null +++ b/src/test/resources/machinegun/config.yaml @@ -0,0 +1,34 @@ +service_name: machinegun +erlang: + cookie: "mg_cookie" + ipv6: false + disable_dns_cache: false +woody_server: + ip: "::" + port: 8022 + keep_alive_timeout: 60s +limits: + process_heap: 2M + disk: + path: "/" + value: 99% + memory: + type: cgroups + value: 90% +logging: + root: /var/log/machinegun + crash_log: crash.log + json_log: log.json + level: info +namespaces: + machinarium: + event_sink: machinarium + default_processing_timeout: 30s + timer_processing_timeout: 60s + reschedule_timeout: 60s + processor: + url: http://host.docker.internal:8080/v1/processor + pool_size: 50 +snowflake_machine_id: 1 +storage: + type: memory