Add source, destination and deposit event sink support (#29)

This commit is contained in:
Pavel Popov 2018-11-13 13:13:43 +03:00 committed by GitHub
parent 77cb1b4cf2
commit f9334a4600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 352 additions and 6 deletions

View File

@ -7,7 +7,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.3</version>
<version>1.2.4</version>
</parent>
<groupId>com.rbkmoney</groupId>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.3</version>
<version>1.2.4</version>
</parent>
<groupId>com.rbkmoney</groupId>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.3</version>
<version>1.2.4</version>
</parent>
<groupId>com.rbkmoney</groupId>
@ -22,7 +22,7 @@
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>fistful-proto</artifactId>
<version>1.4-08df5e9</version>
<version>1.5-cd93749</version>
<scope>provided</scope>
</dependency>
<!--Test-->

View File

@ -74,6 +74,21 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl
return this;
}
public FistfulPollingEventPublisherBuilder withDepositServiceAdapter() {
this.serviceAdapterType = ServiceAdapterType.DEPOSIT;
return this;
}
public FistfulPollingEventPublisherBuilder withSourceServiceAdapter() {
this.serviceAdapterType = ServiceAdapterType.SOURCE;
return this;
}
public FistfulPollingEventPublisherBuilder withDestinationServiceAdapter() {
this.serviceAdapterType = ServiceAdapterType.DESTINATION;
return this;
}
protected ClientBuilder getClientBuilder() {
if (clientBuilder == null) {
clientBuilder = new THSpawnClientBuilder().withAddress(uri);
@ -102,6 +117,12 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl
return FistfulServiceAdapter.buildWalletAdapter(clientBuilder);
case IDENTITY:
return FistfulServiceAdapter.buildIdentityAdapter(clientBuilder);
case DEPOSIT:
return FistfulServiceAdapter.buildDepositAdapter(clientBuilder);
case SOURCE:
return FistfulServiceAdapter.buildSourceAdapter(clientBuilder);
case DESTINATION:
return FistfulServiceAdapter.buildDestinationAdapter(clientBuilder);
default:
throw new IllegalArgumentException("Unknown service adapter type");
}
@ -110,7 +131,10 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl
public enum ServiceAdapterType {
IDENTITY,
WITHDRAWAL,
WALLET
WALLET,
DEPOSIT,
SOURCE,
DESTINATION
}
}

View File

@ -94,6 +94,81 @@ public class FistfulServiceAdapter<TEvent> implements ServiceAdapter<TEvent, Eve
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.deposit.SinkEvent> buildDepositAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.deposit.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.deposit.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.deposit.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.deposit.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.deposit.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.destination.SinkEvent> buildDestinationAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.destination.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.destination.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.destination.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.destination.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.destination.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.source.SinkEvent> buildSourceAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.source.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.source.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.source.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.source.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.source.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
private FistfulServiceAdapter(FistfulRepository<TEvent> repository) {
this.repository = repository;
}

View File

@ -205,4 +205,184 @@ public class FistfulClientTest extends AbstractTest {
publisher.destroy();
}
@Test
public void testDepositServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.deposit.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.deposit.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createDepositEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/deposit");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withDepositServiceAdapter();
builder.withURI(new URI(getUrlString("/deposit")));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.deposit.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.deposit.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
Assert.assertEquals(2, lastId.get());
publisher.destroy();
}
@Test
public void testSourceServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.source.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.source.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createSourceEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/source");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withSourceServiceAdapter();
builder.withURI(new URI(getUrlString("/source")));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.source.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.source.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
Assert.assertEquals(2, lastId.get());
publisher.destroy();
}
@Test
public void testDestinationServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.destination.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.destination.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createDestinationEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/destination");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withDestinationServiceAdapter();
builder.withURI(new URI(getUrlString("/destination")));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.destination.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.destination.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
Assert.assertEquals(2, lastId.get());
publisher.destroy();
}
}

View File

@ -77,4 +77,71 @@ public class FistfulEventGenerator {
return sinkEvent;
}
public static com.rbkmoney.fistful.deposit.SinkEvent createDepositEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.deposit.SinkEvent sinkEvent = new com.rbkmoney.fistful.deposit.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.deposit.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.deposit.Change.created(new com.rbkmoney.fistful.deposit.Deposit())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.deposit.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.source.SinkEvent createSourceEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.source.SinkEvent sinkEvent = new com.rbkmoney.fistful.source.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.source.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.source.Change.created(new com.rbkmoney.fistful.source.Source())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.source.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.destination.SinkEvent createDestinationEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.destination.SinkEvent sinkEvent = new com.rbkmoney.fistful.destination.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.destination.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.destination.Change.destination(new com.rbkmoney.fistful.destination.Destination())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.destination.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
}

View File

@ -12,7 +12,7 @@
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.3</version>
<version>1.2.4</version>
<packaging>pom</packaging>
<properties>