From f9334a4600781b000c6eca8602e19130ad514d4c Mon Sep 17 00:00:00 2001 From: Pavel Popov Date: Tue, 13 Nov 2018 13:13:43 +0300 Subject: [PATCH] Add source, destination and deposit event sink support (#29) --- eventstock-client-core/pom.xml | 2 +- eventstock-client-damsel/pom.xml | 2 +- eventstock-client-fistful/pom.xml | 4 +- .../FistfulPollingEventPublisherBuilder.java | 26 ++- .../client/poll/FistfulServiceAdapter.java | 75 ++++++++ .../client/poll/FistfulClientTest.java | 180 ++++++++++++++++++ .../client/poll/FistfulEventGenerator.java | 67 +++++++ pom.xml | 2 +- 8 files changed, 352 insertions(+), 6 deletions(-) diff --git a/eventstock-client-core/pom.xml b/eventstock-client-core/pom.xml index bdb73ac..6d147c5 100644 --- a/eventstock-client-core/pom.xml +++ b/eventstock-client-core/pom.xml @@ -7,7 +7,7 @@ com.rbkmoney eventstock-client - 1.2.3 + 1.2.4 com.rbkmoney diff --git a/eventstock-client-damsel/pom.xml b/eventstock-client-damsel/pom.xml index ea15c65..742cf5d 100644 --- a/eventstock-client-damsel/pom.xml +++ b/eventstock-client-damsel/pom.xml @@ -7,7 +7,7 @@ com.rbkmoney eventstock-client - 1.2.3 + 1.2.4 com.rbkmoney diff --git a/eventstock-client-fistful/pom.xml b/eventstock-client-fistful/pom.xml index 2242cf7..f2643f7 100644 --- a/eventstock-client-fistful/pom.xml +++ b/eventstock-client-fistful/pom.xml @@ -7,7 +7,7 @@ com.rbkmoney eventstock-client - 1.2.3 + 1.2.4 com.rbkmoney @@ -22,7 +22,7 @@ com.rbkmoney fistful-proto - 1.4-08df5e9 + 1.5-cd93749 provided diff --git a/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulPollingEventPublisherBuilder.java b/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulPollingEventPublisherBuilder.java index 9e2db64..8f62d20 100644 --- a/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulPollingEventPublisherBuilder.java +++ b/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulPollingEventPublisherBuilder.java @@ -74,6 +74,21 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl return this; } + public FistfulPollingEventPublisherBuilder withDepositServiceAdapter() { + this.serviceAdapterType = ServiceAdapterType.DEPOSIT; + return this; + } + + public FistfulPollingEventPublisherBuilder withSourceServiceAdapter() { + this.serviceAdapterType = ServiceAdapterType.SOURCE; + return this; + } + + public FistfulPollingEventPublisherBuilder withDestinationServiceAdapter() { + this.serviceAdapterType = ServiceAdapterType.DESTINATION; + return this; + } + protected ClientBuilder getClientBuilder() { if (clientBuilder == null) { clientBuilder = new THSpawnClientBuilder().withAddress(uri); @@ -102,6 +117,12 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl return FistfulServiceAdapter.buildWalletAdapter(clientBuilder); case IDENTITY: return FistfulServiceAdapter.buildIdentityAdapter(clientBuilder); + case DEPOSIT: + return FistfulServiceAdapter.buildDepositAdapter(clientBuilder); + case SOURCE: + return FistfulServiceAdapter.buildSourceAdapter(clientBuilder); + case DESTINATION: + return FistfulServiceAdapter.buildDestinationAdapter(clientBuilder); default: throw new IllegalArgumentException("Unknown service adapter type"); } @@ -110,7 +131,10 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl public enum ServiceAdapterType { IDENTITY, WITHDRAWAL, - WALLET + WALLET, + DEPOSIT, + SOURCE, + DESTINATION } } diff --git a/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulServiceAdapter.java b/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulServiceAdapter.java index 7405078..7ccb9b1 100644 --- a/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulServiceAdapter.java +++ b/eventstock-client-fistful/src/main/java/com/rbkmoney/eventstock/client/poll/FistfulServiceAdapter.java @@ -94,6 +94,81 @@ public class FistfulServiceAdapter implements ServiceAdapter buildDepositAdapter(ClientBuilder clientBuilder) { + com.rbkmoney.fistful.deposit.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class); + return new FistfulServiceAdapter<>(new FistfulRepository() { + @Override + public List getEvents(EventRange eventRange) throws TException { + return client.getEvents(eventRange); + } + + @Override + public long getLastEventID() throws NoLastEvent, TException { + return client.getLastEventID(); + } + + @Override + public Long getEventId(com.rbkmoney.fistful.deposit.SinkEvent sinkEvent) { + return sinkEvent.getId(); + } + + @Override + public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.deposit.SinkEvent sinkEvent) { + return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt()); + } + }); + } + + public static FistfulServiceAdapter buildDestinationAdapter(ClientBuilder clientBuilder) { + com.rbkmoney.fistful.destination.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class); + return new FistfulServiceAdapter<>(new FistfulRepository() { + @Override + public List getEvents(EventRange eventRange) throws TException { + return client.getEvents(eventRange); + } + + @Override + public long getLastEventID() throws NoLastEvent, TException { + return client.getLastEventID(); + } + + @Override + public Long getEventId(com.rbkmoney.fistful.destination.SinkEvent sinkEvent) { + return sinkEvent.getId(); + } + + @Override + public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.destination.SinkEvent sinkEvent) { + return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt()); + } + }); + } + + public static FistfulServiceAdapter buildSourceAdapter(ClientBuilder clientBuilder) { + com.rbkmoney.fistful.source.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class); + return new FistfulServiceAdapter<>(new FistfulRepository() { + @Override + public List getEvents(EventRange eventRange) throws TException { + return client.getEvents(eventRange); + } + + @Override + public long getLastEventID() throws NoLastEvent, TException { + return client.getLastEventID(); + } + + @Override + public Long getEventId(com.rbkmoney.fistful.source.SinkEvent sinkEvent) { + return sinkEvent.getId(); + } + + @Override + public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.source.SinkEvent sinkEvent) { + return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt()); + } + }); + } + private FistfulServiceAdapter(FistfulRepository repository) { this.repository = repository; } diff --git a/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulClientTest.java b/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulClientTest.java index 0eb3e6a..7781edc 100644 --- a/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulClientTest.java +++ b/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulClientTest.java @@ -205,4 +205,184 @@ public class FistfulClientTest extends AbstractTest { publisher.destroy(); } + @Test + public void testDepositServiceAdapter() throws URISyntaxException, InterruptedException { + Semaphore semaphore = new Semaphore(-1); + AtomicLong lastId = new AtomicLong(-1); + + Servlet srv = createThrftRPCService(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.deposit.EventSinkSrv.Iface() { + @Override + public List getEvents(EventRange range) throws TException { + if (range.getAfter() == -1) { + return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createDepositEvent(i)).collect(Collectors.toList()); + } else { + semaphore.release(1); + return Collections.emptyList(); + } + } + + @Override + public long getLastEventID() throws NoLastEvent, TException { + return 0; + } + }, null); + + addServlet(srv, "/deposit"); + + FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withDepositServiceAdapter(); + builder.withURI(new URI(getUrlString("/deposit"))); + PollingEventPublisher publisher = builder.build(); + DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter() { + @Override + public EventConstraint getEventConstraint() { + EventConstraint.EventIDRange range = new EventConstraint.EventIDRange(); + range.setFromNow(); + return new EventConstraint(range); + } + + @Override + public Filter getFilter() { + return null; + } + + @Override + public int getLimit() { + return 1; + } + + @Override + public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.deposit.SinkEvent o) { + return true; + } + }, (e, k) -> { + lastId.set(e.getId()); + return EventAction.CONTINUE; + }); + + publisher.subscribe(config); + semaphore.acquire(1); + Assert.assertEquals(2, lastId.get()); + publisher.destroy(); + } + + @Test + public void testSourceServiceAdapter() throws URISyntaxException, InterruptedException { + Semaphore semaphore = new Semaphore(-1); + AtomicLong lastId = new AtomicLong(-1); + + Servlet srv = createThrftRPCService(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.source.EventSinkSrv.Iface() { + @Override + public List getEvents(EventRange range) throws TException { + if (range.getAfter() == -1) { + return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createSourceEvent(i)).collect(Collectors.toList()); + } else { + semaphore.release(1); + return Collections.emptyList(); + } + } + + @Override + public long getLastEventID() throws NoLastEvent, TException { + return 0; + } + }, null); + + addServlet(srv, "/source"); + + FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withSourceServiceAdapter(); + builder.withURI(new URI(getUrlString("/source"))); + PollingEventPublisher publisher = builder.build(); + DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter() { + @Override + public EventConstraint getEventConstraint() { + EventConstraint.EventIDRange range = new EventConstraint.EventIDRange(); + range.setFromNow(); + return new EventConstraint(range); + } + + @Override + public Filter getFilter() { + return null; + } + + @Override + public int getLimit() { + return 1; + } + + @Override + public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.source.SinkEvent o) { + return true; + } + }, (e, k) -> { + lastId.set(e.getId()); + return EventAction.CONTINUE; + }); + + publisher.subscribe(config); + semaphore.acquire(1); + Assert.assertEquals(2, lastId.get()); + publisher.destroy(); + } + + @Test + public void testDestinationServiceAdapter() throws URISyntaxException, InterruptedException { + Semaphore semaphore = new Semaphore(-1); + AtomicLong lastId = new AtomicLong(-1); + + Servlet srv = createThrftRPCService(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.destination.EventSinkSrv.Iface() { + @Override + public List getEvents(EventRange range) throws TException { + if (range.getAfter() == -1) { + return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createDestinationEvent(i)).collect(Collectors.toList()); + } else { + semaphore.release(1); + return Collections.emptyList(); + } + } + + @Override + public long getLastEventID() throws NoLastEvent, TException { + return 0; + } + }, null); + + addServlet(srv, "/destination"); + + FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withDestinationServiceAdapter(); + builder.withURI(new URI(getUrlString("/destination"))); + PollingEventPublisher publisher = builder.build(); + DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter() { + @Override + public EventConstraint getEventConstraint() { + EventConstraint.EventIDRange range = new EventConstraint.EventIDRange(); + range.setFromNow(); + return new EventConstraint(range); + } + + @Override + public Filter getFilter() { + return null; + } + + @Override + public int getLimit() { + return 1; + } + + @Override + public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.destination.SinkEvent o) { + return true; + } + }, (e, k) -> { + lastId.set(e.getId()); + return EventAction.CONTINUE; + }); + + publisher.subscribe(config); + semaphore.acquire(1); + Assert.assertEquals(2, lastId.get()); + publisher.destroy(); + } + } diff --git a/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulEventGenerator.java b/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulEventGenerator.java index 06f0b62..8b241d3 100644 --- a/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulEventGenerator.java +++ b/eventstock-client-fistful/src/test/java/com/rbkmoney/eventstock/client/poll/FistfulEventGenerator.java @@ -77,4 +77,71 @@ public class FistfulEventGenerator { return sinkEvent; } + public static com.rbkmoney.fistful.deposit.SinkEvent createDepositEvent(long id) { + String timeString = TypeUtil.temporalToString(Instant.now()); + com.rbkmoney.fistful.deposit.SinkEvent sinkEvent = new com.rbkmoney.fistful.deposit.SinkEvent(); + sinkEvent.setId(id); + sinkEvent.setCreatedAt(timeString); + sinkEvent.setPayload( + new com.rbkmoney.fistful.deposit.Event( + 1, + timeString, + Arrays.asList( + com.rbkmoney.fistful.deposit.Change.created(new com.rbkmoney.fistful.deposit.Deposit()) + ) + ) + ); + try { + sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.deposit.SinkEvent.class)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return sinkEvent; + } + + + public static com.rbkmoney.fistful.source.SinkEvent createSourceEvent(long id) { + String timeString = TypeUtil.temporalToString(Instant.now()); + com.rbkmoney.fistful.source.SinkEvent sinkEvent = new com.rbkmoney.fistful.source.SinkEvent(); + sinkEvent.setId(id); + sinkEvent.setCreatedAt(timeString); + sinkEvent.setPayload( + new com.rbkmoney.fistful.source.Event( + 1, + timeString, + Arrays.asList( + com.rbkmoney.fistful.source.Change.created(new com.rbkmoney.fistful.source.Source()) + ) + ) + ); + try { + sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.source.SinkEvent.class)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return sinkEvent; + } + + public static com.rbkmoney.fistful.destination.SinkEvent createDestinationEvent(long id) { + String timeString = TypeUtil.temporalToString(Instant.now()); + com.rbkmoney.fistful.destination.SinkEvent sinkEvent = new com.rbkmoney.fistful.destination.SinkEvent(); + sinkEvent.setId(id); + sinkEvent.setCreatedAt(timeString); + sinkEvent.setPayload( + new com.rbkmoney.fistful.destination.Event( + 1, + timeString, + Arrays.asList( + com.rbkmoney.fistful.destination.Change.destination(new com.rbkmoney.fistful.destination.Destination()) + ) + ) + ); + try { + sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.destination.SinkEvent.class)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return sinkEvent; + } + } diff --git a/pom.xml b/pom.xml index 4a2a3ce..b0641a0 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ com.rbkmoney eventstock-client - 1.2.3 + 1.2.4 pom