mirror of
https://github.com/valitydev/machinarium.git
synced 2024-11-06 02:25:23 +00:00
Initial implementation (#1)
* Initial implementation * Add Jenkinsfile, fix arg types in client * Review fixes (part 1) * Review fixes (part 2) * Trying to build lib like a service * Build fixes * Trying with resource mapping * Use machinegun mock instead machinegun in container
This commit is contained in:
parent
ab7a186941
commit
3f03c53c2e
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
/.idea
|
||||
/target
|
||||
*.iml
|
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
[submodule "build_utils"]
|
||||
path = build_utils
|
||||
url = git@github.com:rbkmoney/build_utils.git
|
16
Jenkinsfile
vendored
Normal file
16
Jenkinsfile
vendored
Normal file
@ -0,0 +1,16 @@
|
||||
#!groovy
|
||||
build('machinarium', 'java-maven') {
|
||||
checkoutRepo()
|
||||
loadBuildUtils()
|
||||
|
||||
def mvnArgs = '-DjvmArgs="-Xmx256m"'
|
||||
runStage('Maven package') {
|
||||
withCredentials([[$class: 'FileBinding', credentialsId: 'java-maven-settings.xml', variable: 'SETTINGS_XML']]) {
|
||||
if (env.BRANCH_NAME == 'master') {
|
||||
sh 'mvn deploy --batch-mode --settings $SETTINGS_XML ' + "${mvnArgs}"
|
||||
} else {
|
||||
sh 'mvn package --batch-mode --settings $SETTINGS_XML ' + "${mvnArgs}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
1
build_utils
Submodule
1
build_utils
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 9b664082ddc8ec8cdfbe7513d54e433d24198cc2
|
57
pom.xml
Normal file
57
pom.xml
Normal file
@ -0,0 +1,57 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.rbkmoney</groupId>
|
||||
<artifactId>parent</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
|
||||
<groupId>com.rbkmoney</groupId>
|
||||
<artifactId>machinarium</artifactId>
|
||||
<version>0.1.0</version>
|
||||
|
||||
<description>Machinegun java client/processor handler lib</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.rbkmoney</groupId>
|
||||
<artifactId>machinegun-proto</artifactId>
|
||||
<version>1.9-b71bfca</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.rbkmoney.woody</groupId>
|
||||
<artifactId>woody-thrift</artifactId>
|
||||
<version>1.1.15</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.rbkmoney.geck</groupId>
|
||||
<artifactId>serializer</artifactId>
|
||||
<version>0.6.9</version>
|
||||
</dependency>
|
||||
<!--Test libs-->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.7.21</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-quickstart</artifactId>
|
||||
<version>9.3.9.M1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
</project>
|
@ -0,0 +1,19 @@
|
||||
package com.rbkmoney.machinarium.client;
|
||||
|
||||
import com.rbkmoney.machinarium.domain.TMachineEvent;
|
||||
import com.rbkmoney.machinarium.exception.*;
|
||||
import com.rbkmoney.machinegun.stateproc.Machine;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface AutomatonClient<A, V> {
|
||||
|
||||
void start(String machineId, A args) throws MachineAlreadyExistsException, MachineFailedException, NamespaceNotFoundException;
|
||||
|
||||
V call(String machineId, A args) throws NamespaceNotFoundException, MachineFailedException, MachineNotFoundException, MachineAlreadyWorkingException;
|
||||
|
||||
Machine getMachine(String machineId) throws MachineNotFoundException, NamespaceNotFoundException;
|
||||
|
||||
List<TMachineEvent<V>> getEvents(String machineId) throws MachineNotFoundException, NamespaceNotFoundException;
|
||||
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
package com.rbkmoney.machinarium.client;
|
||||
|
||||
import com.rbkmoney.geck.serializer.Geck;
|
||||
import com.rbkmoney.machinarium.domain.TMachineEvent;
|
||||
import com.rbkmoney.machinarium.exception.*;
|
||||
import com.rbkmoney.machinarium.util.MachineUtil;
|
||||
import com.rbkmoney.machinegun.msgpack.Value;
|
||||
import com.rbkmoney.machinegun.stateproc.*;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TBaseAutomatonClient<A extends TBase, V extends TBase> implements AutomatonClient<A, V> {
|
||||
|
||||
private final AutomatonSrv.Iface client;
|
||||
|
||||
private final String namespace;
|
||||
|
||||
private final Class<V> resultType;
|
||||
|
||||
public TBaseAutomatonClient(AutomatonSrv.Iface client, String namespace, Class<V> resultType) {
|
||||
this.client = client;
|
||||
this.namespace = namespace;
|
||||
this.resultType = resultType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String machineId, A args) throws MachineAlreadyExistsException, MachineFailedException, NamespaceNotFoundException {
|
||||
try {
|
||||
client.start(namespace, machineId, Value.bin(Geck.toMsgPack(args)));
|
||||
} catch (MachineFailed ex) {
|
||||
throw new MachineFailedException(String.format("Machine failed, namespace='%s', machineId='%s', args='%s'", namespace, machineId, args), ex);
|
||||
} catch (MachineAlreadyExists ex) {
|
||||
throw new MachineAlreadyExistsException(String.format("Machine already exists, namespace='%s', machineId='%s'", namespace, machineId), ex);
|
||||
} catch (NamespaceNotFound ex) {
|
||||
throw new NamespaceNotFoundException(String.format("Namespace not found, namespace='%s'", namespace), ex);
|
||||
} catch (TException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public V call(String machineId, A args) throws NamespaceNotFoundException, MachineFailedException, MachineNotFoundException, MachineAlreadyWorkingException {
|
||||
try {
|
||||
Value value = client.call(new MachineDescriptor(namespace, Reference.id(machineId), new HistoryRange()), Value.bin(Geck.toMsgPack(args)));
|
||||
return Geck.msgPackToTBase(value.getBin(), resultType);
|
||||
} catch (MachineFailed ex) {
|
||||
throw new MachineFailedException(String.format("Machine failed, namespace='%s', machineId='%s', args='%s'", namespace, machineId, args), ex);
|
||||
} catch (NamespaceNotFound ex) {
|
||||
throw new NamespaceNotFoundException(String.format("Namespace not found, namespace='%s'", namespace), ex);
|
||||
} catch (MachineNotFound ex) {
|
||||
throw new MachineNotFoundException(String.format("Machine not found, namespace='%s', machineId='%s'", namespace, machineId), ex);
|
||||
} catch (MachineAlreadyWorking ex) {
|
||||
throw new MachineAlreadyWorkingException(String.format("Machine already working, namespace='%s', machineId='%s'", namespace, machineId), ex);
|
||||
} catch (TException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Machine getMachine(String machineId) throws MachineNotFoundException, NamespaceNotFoundException {
|
||||
try {
|
||||
return client.getMachine(new MachineDescriptor(namespace, Reference.id(machineId), new HistoryRange()));
|
||||
} catch (MachineNotFound ex) {
|
||||
throw new MachineNotFoundException(String.format("Machine not found, namespace='%s', machineId='%s'", namespace, machineId), ex);
|
||||
} catch (NamespaceNotFound ex) {
|
||||
throw new NamespaceNotFoundException(String.format("Namespace not found, namespace='%s'", namespace), ex);
|
||||
} catch (TException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TMachineEvent<V>> getEvents(String machineId) throws MachineNotFoundException, NamespaceNotFoundException {
|
||||
Machine machine = getMachine(machineId);
|
||||
return MachineUtil.getMachineEvents(machine, resultType);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package com.rbkmoney.machinarium.domain;
|
||||
|
||||
import com.rbkmoney.machinegun.stateproc.ComplexAction;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class CallResultData<T> {
|
||||
|
||||
private final T callResult;
|
||||
|
||||
private final List<T> newEvents;
|
||||
|
||||
private final ComplexAction complexAction;
|
||||
|
||||
public CallResultData(T callResult, List<T> newEvents, ComplexAction complexAction) {
|
||||
this.callResult = callResult;
|
||||
this.newEvents = newEvents;
|
||||
this.complexAction = complexAction;
|
||||
}
|
||||
|
||||
public T getCallResult() {
|
||||
return callResult;
|
||||
}
|
||||
|
||||
public List<T> getNewEvents() {
|
||||
return newEvents;
|
||||
}
|
||||
|
||||
public ComplexAction getComplexAction() {
|
||||
return complexAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CallResultData{" +
|
||||
"callResult=" + callResult +
|
||||
", newEvents=" + newEvents +
|
||||
", complexAction=" + complexAction +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package com.rbkmoney.machinarium.domain;
|
||||
|
||||
import com.rbkmoney.machinegun.stateproc.ComplexAction;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class SignalResultData<T> {
|
||||
|
||||
private final List<T> newEvents;
|
||||
|
||||
private final ComplexAction complexAction;
|
||||
|
||||
public SignalResultData(List<T> newEvents, ComplexAction complexAction) {
|
||||
this.newEvents = newEvents;
|
||||
this.complexAction = complexAction;
|
||||
}
|
||||
|
||||
public List<T> getNewEvents() {
|
||||
return newEvents;
|
||||
}
|
||||
|
||||
public ComplexAction getComplexAction() {
|
||||
return complexAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SignalResultData{" +
|
||||
"newEvents=" + newEvents +
|
||||
", complexAction=" + complexAction +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package com.rbkmoney.machinarium.domain;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public class TMachineEvent<T> {
|
||||
|
||||
private final long id;
|
||||
|
||||
private final Instant createdAt;
|
||||
|
||||
private final T data;
|
||||
|
||||
public TMachineEvent(long id, Instant createdAt, T data) {
|
||||
this.id = id;
|
||||
this.createdAt = createdAt;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public Instant getCreatedAt() {
|
||||
return createdAt;
|
||||
}
|
||||
|
||||
public T getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TEvent{" +
|
||||
"id=" + id +
|
||||
", createdAt=" + createdAt +
|
||||
", data=" + data +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.rbkmoney.machinarium.exception;
|
||||
|
||||
public class MachineAlreadyExistsException extends RuntimeException {
|
||||
|
||||
public MachineAlreadyExistsException() {
|
||||
}
|
||||
|
||||
public MachineAlreadyExistsException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MachineAlreadyExistsException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public MachineAlreadyExistsException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public MachineAlreadyExistsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.rbkmoney.machinarium.exception;
|
||||
|
||||
public class MachineAlreadyWorkingException extends RuntimeException {
|
||||
|
||||
public MachineAlreadyWorkingException() {
|
||||
}
|
||||
|
||||
public MachineAlreadyWorkingException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MachineAlreadyWorkingException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public MachineAlreadyWorkingException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public MachineAlreadyWorkingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package com.rbkmoney.machinarium.exception;
|
||||
|
||||
public class MachineFailedException extends RuntimeException {
|
||||
|
||||
public MachineFailedException() {
|
||||
}
|
||||
|
||||
public MachineFailedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MachineFailedException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public MachineFailedException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public MachineFailedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.rbkmoney.machinarium.exception;
|
||||
|
||||
public class MachineNotFoundException extends RuntimeException {
|
||||
|
||||
public MachineNotFoundException() {
|
||||
}
|
||||
|
||||
public MachineNotFoundException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MachineNotFoundException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public MachineNotFoundException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public MachineNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.rbkmoney.machinarium.exception;
|
||||
|
||||
public class NamespaceNotFoundException extends RuntimeException {
|
||||
|
||||
public NamespaceNotFoundException() {
|
||||
}
|
||||
|
||||
public NamespaceNotFoundException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public NamespaceNotFoundException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public NamespaceNotFoundException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public NamespaceNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
package com.rbkmoney.machinarium.handler;
|
||||
|
||||
import com.rbkmoney.geck.serializer.Geck;
|
||||
import com.rbkmoney.machinarium.domain.CallResultData;
|
||||
import com.rbkmoney.machinarium.domain.SignalResultData;
|
||||
import com.rbkmoney.machinarium.domain.TMachineEvent;
|
||||
import com.rbkmoney.machinarium.util.MachineUtil;
|
||||
import com.rbkmoney.machinegun.msgpack.Nil;
|
||||
import com.rbkmoney.machinegun.msgpack.Value;
|
||||
import com.rbkmoney.machinegun.stateproc.*;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractProcessorHandler<A extends TBase, V extends TBase> implements ProcessorSrv.Iface {
|
||||
|
||||
private final Class<A> argsType;
|
||||
|
||||
private final Class<V> resultType;
|
||||
|
||||
public AbstractProcessorHandler(Class<A> argsType, Class<V> resultType) {
|
||||
this.argsType = argsType;
|
||||
this.resultType = resultType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SignalResult processSignal(SignalArgs args) throws TException {
|
||||
Signal._Fields signalType = args.getSignal().getSetField();
|
||||
Machine machine = args.getMachine();
|
||||
SignalResultData signalResult = processSignal(signalType, args, machine);
|
||||
|
||||
return new SignalResult(
|
||||
buildMachineStateChange(signalResult.getNewEvents()),
|
||||
signalResult.getComplexAction()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallResult processCall(CallArgs args) throws TException {
|
||||
Machine machine = args.getMachine();
|
||||
CallResultData<V> callResult = processCall(
|
||||
machine.getNs(),
|
||||
machine.getId(),
|
||||
Geck.msgPackToTBase(args.getArg().getBin(), argsType),
|
||||
MachineUtil.getMachineEvents(machine, resultType)
|
||||
);
|
||||
|
||||
return new CallResult(
|
||||
Value.bin(Geck.toMsgPack(callResult.getCallResult())),
|
||||
buildMachineStateChange(callResult.getNewEvents()),
|
||||
callResult.getComplexAction()
|
||||
);
|
||||
}
|
||||
|
||||
private SignalResultData processSignal(Signal._Fields signalType, SignalArgs args, Machine machine) {
|
||||
switch (signalType) {
|
||||
case INIT:
|
||||
InitSignal initSignal = args.getSignal().getInit();
|
||||
return processSignalInit(machine.getNs(), machine.getId(), Geck.msgPackToTBase(initSignal.getArg().getBin(), argsType));
|
||||
case TIMEOUT:
|
||||
return processSignalTimeout(machine.getNs(), machine.getId(), MachineUtil.getMachineEvents(machine, resultType));
|
||||
default:
|
||||
throw new UnsupportedOperationException(String.format("Unsupported signal type, signalType='%s'", signalType));
|
||||
}
|
||||
}
|
||||
|
||||
private MachineStateChange buildMachineStateChange(List<V> newEvents) {
|
||||
MachineStateChange machineStateChange = new MachineStateChange();
|
||||
machineStateChange.setAuxStateLegacy(Value.nl(new Nil())); //??
|
||||
machineStateChange.setEventsLegacy(
|
||||
newEvents.stream()
|
||||
.map(event -> Value.bin(Geck.toMsgPack(event)))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
return machineStateChange;
|
||||
}
|
||||
|
||||
protected abstract SignalResultData<V> processSignalInit(String namespace, String machineId, A args);
|
||||
|
||||
protected abstract SignalResultData<V> processSignalTimeout(String namespace, String machineId, List<TMachineEvent<V>> events);
|
||||
|
||||
protected abstract CallResultData<V> processCall(String namespace, String machineId, A args, List<TMachineEvent<V>> events);
|
||||
|
||||
}
|
28
src/main/java/com/rbkmoney/machinarium/util/MachineUtil.java
Normal file
28
src/main/java/com/rbkmoney/machinarium/util/MachineUtil.java
Normal file
@ -0,0 +1,28 @@
|
||||
package com.rbkmoney.machinarium.util;
|
||||
|
||||
import com.rbkmoney.geck.serializer.Geck;
|
||||
import com.rbkmoney.machinarium.domain.TMachineEvent;
|
||||
import com.rbkmoney.machinegun.stateproc.Event;
|
||||
import com.rbkmoney.machinegun.stateproc.Machine;
|
||||
import org.apache.thrift.TBase;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class MachineUtil {
|
||||
|
||||
public static <T extends TBase> List<TMachineEvent<T>> getMachineEvents(Machine machine, Class<T> eventType) {
|
||||
return machine.getHistory().stream()
|
||||
.sorted(Comparator.comparingLong(Event::getId))
|
||||
.map(
|
||||
event -> new TMachineEvent<>(
|
||||
event.getId(),
|
||||
Instant.parse(event.getCreatedAt()),
|
||||
Geck.msgPackToTBase(event.getData().getBin(), eventType)
|
||||
)
|
||||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
63
src/test/java/com/rbkmoney/machinarium/AbstractTest.java
Normal file
63
src/test/java/com/rbkmoney/machinarium/AbstractTest.java
Normal file
@ -0,0 +1,63 @@
|
||||
package com.rbkmoney.machinarium;
|
||||
|
||||
import com.rbkmoney.woody.thrift.impl.http.THServiceBuilder;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.HandlerCollection;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.junit.After;
|
||||
|
||||
import javax.servlet.Servlet;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
|
||||
public class AbstractTest {
|
||||
|
||||
private HandlerCollection handlerCollection;
|
||||
private Server server;
|
||||
|
||||
protected int serverPort = 8080;
|
||||
|
||||
protected void addServlets(Map<String, Servlet> servlets) {
|
||||
try {
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
for (Map.Entry<String, Servlet> entry : servlets.entrySet()) {
|
||||
ServletHolder defaultServ = new ServletHolder(entry.getKey(), entry.getValue());
|
||||
context.addServlet(defaultServ, entry.getKey());
|
||||
}
|
||||
handlerCollection.addHandler(context);
|
||||
context.start();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected <T> Servlet createThriftRPCService(Class<T> iface, T handler) {
|
||||
THServiceBuilder serviceBuilder = new THServiceBuilder();
|
||||
return serviceBuilder.build(iface, handler);
|
||||
}
|
||||
|
||||
protected URI buildURI(String path) throws URISyntaxException {
|
||||
return new URI("http://localhost:" + serverPort + path);
|
||||
}
|
||||
|
||||
protected void startServer() throws Exception {
|
||||
server = new Server(serverPort);
|
||||
HandlerCollection contextHandlerCollection = new HandlerCollection(true);
|
||||
this.handlerCollection = contextHandlerCollection;
|
||||
server.setHandler(contextHandlerCollection);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopServer() {
|
||||
try {
|
||||
server.stop();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
127
src/test/java/com/rbkmoney/machinarium/AutomatonMockSrvImpl.java
Normal file
127
src/test/java/com/rbkmoney/machinarium/AutomatonMockSrvImpl.java
Normal file
@ -0,0 +1,127 @@
|
||||
package com.rbkmoney.machinarium;
|
||||
|
||||
import com.rbkmoney.machinegun.msgpack.Value;
|
||||
import com.rbkmoney.machinegun.stateproc.*;
|
||||
import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AutomatonMockSrvImpl implements AutomatonSrv.Iface {
|
||||
|
||||
private final String namespace = "machinarium";
|
||||
|
||||
private Map<String, Machine> machines = new HashMap<>();
|
||||
|
||||
private AtomicLong eventIdCounter = new AtomicLong(1);
|
||||
|
||||
private ProcessorSrv.Iface client;
|
||||
|
||||
public AutomatonMockSrvImpl(URI uri) {
|
||||
client = new THSpawnClientBuilder()
|
||||
.withAddress(uri)
|
||||
.build(ProcessorSrv.Iface.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String ns, String id, Value args) throws NamespaceNotFound, MachineAlreadyExists, MachineFailed, TException {
|
||||
if (!namespace.equals(ns)) {
|
||||
throw new NamespaceNotFound();
|
||||
}
|
||||
|
||||
if (machines.containsKey(id)) {
|
||||
throw new MachineAlreadyExists();
|
||||
}
|
||||
|
||||
Machine machine = new Machine(ns, id, new ArrayList<>(), new HistoryRange());
|
||||
SignalResult signalResult = client.processSignal(new SignalArgs(Signal.init(new InitSignal(args)), machine));
|
||||
machine.setHistory(
|
||||
signalResult.getChange().getEventsLegacy()
|
||||
.stream().map(
|
||||
event -> new Event(
|
||||
eventIdCounter.getAndIncrement(),
|
||||
Instant.now().toString(),
|
||||
event
|
||||
)
|
||||
).collect(Collectors.toList())
|
||||
);
|
||||
machines.put(id, machine);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void repair(MachineDescriptor desc, Value a) throws NamespaceNotFound, MachineNotFound, MachineFailed, MachineAlreadyWorking, TException {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void simpleRepair(String ns, Reference ref) throws NamespaceNotFound, MachineNotFound, MachineFailed, MachineAlreadyWorking, TException {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Value call(MachineDescriptor desc, Value args) throws NamespaceNotFound, MachineNotFound, MachineFailed, TException {
|
||||
if (!namespace.equals(desc.getNs())) {
|
||||
throw new NamespaceNotFound();
|
||||
}
|
||||
|
||||
if (desc.getRef().isSetTag()) {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
if (!machines.containsKey(desc.getRef().getId())) {
|
||||
throw new MachineNotFound();
|
||||
}
|
||||
|
||||
Machine machine = machines.get(desc.getRef().getId());
|
||||
CallResult callResult = client.processCall(new CallArgs(args, machine));
|
||||
List<Event> events = machine.getHistory();
|
||||
events.addAll(
|
||||
callResult.getChange().getEventsLegacy()
|
||||
.stream().map(
|
||||
event -> new Event(
|
||||
eventIdCounter.getAndIncrement(),
|
||||
Instant.now().toString(),
|
||||
event
|
||||
)
|
||||
).collect(Collectors.toList())
|
||||
);
|
||||
machine.setHistory(events);
|
||||
machines.put(desc.getRef().getId(), machine);
|
||||
|
||||
return callResult.getResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Machine getMachine(MachineDescriptor desc) throws NamespaceNotFound, MachineNotFound, EventNotFound, TException {
|
||||
if (!namespace.equals(desc.getNs())) {
|
||||
throw new NamespaceNotFound();
|
||||
}
|
||||
|
||||
if (desc.getRef().isSetTag()) {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
if (!machines.containsKey(desc.getRef().getId())) {
|
||||
throw new MachineNotFound();
|
||||
}
|
||||
|
||||
return machines.get(desc.getRef().getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String ns, String id) throws NamespaceNotFound, MachineNotFound, TException {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modernize(MachineDescriptor desc) throws NamespaceNotFound, MachineNotFound, TException {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
package com.rbkmoney.machinarium;
|
||||
|
||||
import com.rbkmoney.machinarium.client.AutomatonClient;
|
||||
import com.rbkmoney.machinarium.client.TBaseAutomatonClient;
|
||||
import com.rbkmoney.machinarium.domain.CallResultData;
|
||||
import com.rbkmoney.machinarium.domain.SignalResultData;
|
||||
import com.rbkmoney.machinarium.domain.TMachineEvent;
|
||||
import com.rbkmoney.machinarium.exception.MachineAlreadyExistsException;
|
||||
import com.rbkmoney.machinarium.exception.NamespaceNotFoundException;
|
||||
import com.rbkmoney.machinarium.handler.AbstractProcessorHandler;
|
||||
import com.rbkmoney.machinegun.msgpack.Value;
|
||||
import com.rbkmoney.machinegun.stateproc.AutomatonSrv;
|
||||
import com.rbkmoney.machinegun.stateproc.ComplexAction;
|
||||
import com.rbkmoney.machinegun.stateproc.ProcessorSrv;
|
||||
import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.servlet.Servlet;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static junit.framework.TestCase.fail;
|
||||
|
||||
public class MachinegunComplexTest extends AbstractTest {
|
||||
|
||||
private final String automatonPath = "/v1/automaton";
|
||||
private final String processorPath = "/v1/processor";
|
||||
|
||||
private AutomatonClient<Value, Value> aClient;
|
||||
private AutomatonSrv.Iface thriftClient;
|
||||
|
||||
|
||||
private Servlet processorServlet = createThriftRPCService(ProcessorSrv.Iface.class, new AbstractProcessorHandler<Value, Value>(Value.class, Value.class) {
|
||||
|
||||
@Override
|
||||
protected SignalResultData<Value> processSignalInit(String namespace, String machineId, Value args) {
|
||||
return new SignalResultData(Arrays.asList(args), new ComplexAction());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SignalResultData<Value> processSignalTimeout(String namespace, String machineId, List<TMachineEvent<Value>> tMachineEvents) {
|
||||
return new SignalResultData(Arrays.asList(Value.str("timeout")), new ComplexAction());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CallResultData<Value> processCall(String namespace, String machineId, Value args, List<TMachineEvent<Value>> tMachineEvents) {
|
||||
return new CallResultData(args, Arrays.asList(args), new ComplexAction());
|
||||
}
|
||||
});
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
startServer();
|
||||
|
||||
Servlet automatonServlet = createThriftRPCService(AutomatonSrv.Iface.class, new AutomatonMockSrvImpl(buildURI(processorPath)));
|
||||
Map<String, Servlet> servlets = new HashMap<>();
|
||||
servlets.put(automatonPath, automatonServlet);
|
||||
servlets.put(processorPath, processorServlet);
|
||||
addServlets(servlets);
|
||||
|
||||
thriftClient = new THSpawnClientBuilder()
|
||||
.withAddress(buildURI(automatonPath))
|
||||
.withNetworkTimeout(0)
|
||||
.build(AutomatonSrv.Iface.class);
|
||||
aClient = new TBaseAutomatonClient<>(thriftClient, "machinarium", Value.class);
|
||||
}
|
||||
|
||||
@Test(expected = NamespaceNotFoundException.class)
|
||||
public void testNamespaceNotFound() {
|
||||
new TBaseAutomatonClient<>(thriftClient, "not_found", Value.class)
|
||||
.start("kek", Value.b(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartMachine() {
|
||||
String machineId = "start_test";
|
||||
aClient.start(machineId, Value.b(true));
|
||||
try {
|
||||
aClient.start(machineId, Value.b(false));
|
||||
fail();
|
||||
} catch (MachineAlreadyExistsException ex) {
|
||||
|
||||
}
|
||||
List<TMachineEvent<Value>> events = aClient.getEvents(machineId);
|
||||
assertEquals(1, events.size());
|
||||
assertEquals(Value.b(true), events.get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartAndCallMachine() {
|
||||
String machineId = "call_test";
|
||||
aClient.start(machineId, Value.b(true));
|
||||
Value value = aClient.call(machineId, Value.b(false));
|
||||
assertEquals(Value.b(false), value);
|
||||
List<TMachineEvent<Value>> events = aClient.getEvents(machineId);
|
||||
assertEquals(2, events.size());
|
||||
assertEquals(Value.b(true), events.get(0).getData());
|
||||
assertEquals(Value.b(false), events.get(1).getData());
|
||||
}
|
||||
|
||||
|
||||
}
|
5
src/test/resources/log4j.properties
Normal file
5
src/test/resources/log4j.properties
Normal file
@ -0,0 +1,5 @@
|
||||
log4j.rootLogger=INFO, stdout
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.Target=System.out
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=[%t] %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p Span[%X{trace_id}-%X{span_id}-%X{parent_id}] - %m%n
|
34
src/test/resources/machinegun/config.yaml
Normal file
34
src/test/resources/machinegun/config.yaml
Normal file
@ -0,0 +1,34 @@
|
||||
service_name: machinegun
|
||||
erlang:
|
||||
cookie: "mg_cookie"
|
||||
ipv6: false
|
||||
disable_dns_cache: false
|
||||
woody_server:
|
||||
ip: "::"
|
||||
port: 8022
|
||||
keep_alive_timeout: 60s
|
||||
limits:
|
||||
process_heap: 2M
|
||||
disk:
|
||||
path: "/"
|
||||
value: 99%
|
||||
memory:
|
||||
type: cgroups
|
||||
value: 90%
|
||||
logging:
|
||||
root: /var/log/machinegun
|
||||
crash_log: crash.log
|
||||
json_log: log.json
|
||||
level: info
|
||||
namespaces:
|
||||
machinarium:
|
||||
event_sink: machinarium
|
||||
default_processing_timeout: 30s
|
||||
timer_processing_timeout: 60s
|
||||
reschedule_timeout: 60s
|
||||
processor:
|
||||
url: http://host.docker.internal:8080/v1/processor
|
||||
pool_size: 50
|
||||
snowflake_machine_id: 1
|
||||
storage:
|
||||
type: memory
|
Loading…
Reference in New Issue
Block a user