BJ-310: Дорабока библиотеки eventstock (#31)

* 1. BJ-310: Дорабока библиотеки eventstock
2. Рефакторинг кода

* 1. BJ-310: Дорабока библиотеки eventstock
2. Рефакторинг кода
This commit is contained in:
Baikov Dmitrii 2018-11-20 18:08:56 +03:00 committed by GitHub
parent f9334a4600
commit a184225882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1110 additions and 585 deletions

View File

@ -7,7 +7,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.4</version>
<version>1.2.5</version>
</parent>
<groupId>com.rbkmoney</groupId>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.4</version>
<version>1.2.5</version>
</parent>
<groupId>com.rbkmoney</groupId>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.4</version>
<version>1.2.5</version>
</parent>
<groupId>com.rbkmoney</groupId>
@ -22,7 +22,7 @@
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>fistful-proto</artifactId>
<version>1.5-cd93749</version>
<version>1.8-37ccefb</version>
<scope>provided</scope>
</dependency>
<!--Test-->

View File

@ -8,6 +8,7 @@ import java.net.URI;
import java.util.Objects;
public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPublisherBuilder {
private static final int DEFAULT_HOUSEKEEPER_TIMEOUT = 1000;
private URI uri;
@ -89,6 +90,11 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl
return this;
}
public FistfulPollingEventPublisherBuilder withWithdrawalSessionServiceAdapter() {
this.serviceAdapterType = ServiceAdapterType.WITHDRAWAL_SESSION;
return this;
}
protected ClientBuilder getClientBuilder() {
if (clientBuilder == null) {
clientBuilder = new THSpawnClientBuilder().withAddress(uri);
@ -123,6 +129,8 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl
return FistfulServiceAdapter.buildSourceAdapter(clientBuilder);
case DESTINATION:
return FistfulServiceAdapter.buildDestinationAdapter(clientBuilder);
case WITHDRAWAL_SESSION:
return FistfulServiceAdapter.buildWithdrawalSessionAdapter(clientBuilder);
default:
throw new IllegalArgumentException("Unknown service adapter type");
}
@ -134,7 +142,8 @@ public class FistfulPollingEventPublisherBuilder extends DefaultPollingEventPubl
WALLET,
DEPOSIT,
SOURCE,
DESTINATION
DESTINATION,
WITHDRAWAL_SESSION
}
}

View File

@ -1,12 +1,10 @@
package com.rbkmoney.eventstock.client.poll;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.poll.repositories.*;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.eventsink.NoLastEvent;
import com.rbkmoney.fistful.withdrawal.SinkEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,162 +15,41 @@ import java.util.List;
public class FistfulServiceAdapter<TEvent> implements ServiceAdapter<TEvent, EventConstraint> {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final FistfulRepository<TEvent> repository;
public static FistfulServiceAdapter<com.rbkmoney.fistful.withdrawal.SinkEvent> buildWithdrawalAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.withdrawal.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.withdrawal.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.identity.SinkEvent> buildIdentityAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.identity.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.identity.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.identity.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.identity.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.identity.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.identity.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.wallet.SinkEvent> buildWalletAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.wallet.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.wallet.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.wallet.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.wallet.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.wallet.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.wallet.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.deposit.SinkEvent> buildDepositAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.deposit.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.deposit.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.deposit.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.deposit.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.deposit.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.destination.SinkEvent> buildDestinationAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.destination.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.destination.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.destination.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.destination.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.destination.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.source.SinkEvent> buildSourceAdapter(ClientBuilder clientBuilder) {
com.rbkmoney.fistful.source.EventSinkSrv.Iface client = clientBuilder.build(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class);
return new FistfulServiceAdapter<>(new FistfulRepository<com.rbkmoney.fistful.source.SinkEvent>() {
@Override
public List<com.rbkmoney.fistful.source.SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws NoLastEvent, TException {
return client.getLastEventID();
}
@Override
public Long getEventId(com.rbkmoney.fistful.source.SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(com.rbkmoney.fistful.source.SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
});
}
private FistfulServiceAdapter(FistfulRepository<TEvent> repository) {
this.repository = repository;
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.withdrawal.SinkEvent> buildWithdrawalAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new WithdrawalFistfulRepository(clientBuilder));
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.identity.SinkEvent> buildIdentityAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new IdentityFistfulRepository(clientBuilder));
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.wallet.SinkEvent> buildWalletAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new WalletFistfulRepository(clientBuilder));
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.deposit.SinkEvent> buildDepositAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new DepositFistfulRepository(clientBuilder));
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.destination.SinkEvent> buildDestinationAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new DestinationFistfulRepository(clientBuilder));
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.source.SinkEvent> buildSourceAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new SourceFistfulRepository(clientBuilder));
}
public static FistfulServiceAdapter<com.rbkmoney.fistful.withdrawal_session.SinkEvent> buildWithdrawalSessionAdapter(ClientBuilder clientBuilder) {
return new FistfulServiceAdapter<>(new WithdrawalSessionFistfulRepository(clientBuilder));
}
@Override
public Collection<TEvent> getEventRange(EventConstraint eventConstraint, int limit) throws ServiceException {
EventRange eventRange = convertConstraint(eventConstraint, limit);

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.deposit.EventSinkSrv.Iface;
import com.rbkmoney.fistful.deposit.SinkEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class DepositFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public DepositFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.destination.SinkEvent;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import com.rbkmoney.fistful.destination.EventSinkSrv.Iface;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class DestinationFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public DestinationFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.identity.EventSinkSrv.Iface;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.identity.SinkEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class IdentityFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public IdentityFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.source.EventSinkSrv.Iface;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.source.SinkEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class SourceFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public SourceFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.wallet.EventSinkSrv.Iface;
import com.rbkmoney.fistful.wallet.SinkEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class WalletFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public WalletFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface;
import com.rbkmoney.fistful.withdrawal.SinkEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class WithdrawalFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public WithdrawalFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.eventstock.client.poll.repositories;
import com.rbkmoney.eventstock.client.poll.FistfulRepository;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.withdrawal_session.SinkEvent;
import com.rbkmoney.fistful.withdrawal_session.EventSinkSrv.Iface;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
public class WithdrawalSessionFistfulRepository implements FistfulRepository<SinkEvent> {
private final Iface client;
public WithdrawalSessionFistfulRepository(ClientBuilder clientBuilder) {
client = clientBuilder.build(Iface.class);
}
@Override
public List<SinkEvent> getEvents(EventRange eventRange) throws TException {
return client.getEvents(eventRange);
}
@Override
public long getLastEventID() throws TException {
return client.getLastEventID();
}
@Override
public Long getEventId(SinkEvent sinkEvent) {
return sinkEvent.getId();
}
@Override
public TemporalAccessor getEventCreatedAt(SinkEvent sinkEvent) {
return TypeUtil.stringToTemporal(sinkEvent.getCreatedAt());
}
}

View File

@ -2,26 +2,28 @@ package com.rbkmoney.eventstock.client.poll;
import com.rbkmoney.eventstock.client.DefaultSubscriberConfig;
import com.rbkmoney.eventstock.client.EventAction;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.eventsink.NoLastEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.woody.api.ClientBuilder;
import org.apache.thrift.TException;
import com.rbkmoney.eventstock.client.poll.additional.deposit.DepositEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.deposit.DepositEventSinkSrv;
import com.rbkmoney.eventstock.client.poll.additional.destination.DestinationEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.destination.DestinationEventSinkSrv;
import com.rbkmoney.eventstock.client.poll.additional.identity.IdentityEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.identity.IdentityEventSinkSrv;
import com.rbkmoney.eventstock.client.poll.additional.source.SourceEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.source.SourceEventSinkSrv;
import com.rbkmoney.eventstock.client.poll.additional.wallet.WalletEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.wallet.WalletEventSinkSrv;
import com.rbkmoney.eventstock.client.poll.additional.withdrawal.WithdrawalEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.withdrawal.WithdrawalEventSinkSrv;
import com.rbkmoney.eventstock.client.poll.additional.withdrawal_session.WithdrawalSessionEventFilter;
import com.rbkmoney.eventstock.client.poll.additional.withdrawal_session.WithdrawalSessionEventSinkSrv;
import org.junit.Assert;
import org.junit.Test;
import javax.servlet.Servlet;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.temporal.TemporalAccessor;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class FistfulClientTest extends AbstractTest {
@ -29,55 +31,23 @@ public class FistfulClientTest extends AbstractTest {
public void testWalletServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/wallet";
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.wallet.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.wallet.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.wallet.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createWalletEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.wallet.EventSinkSrv.Iface.class,
new WalletEventSinkSrv(semaphore), null);
addServlet(srv, path);
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/wallet");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withWalletServiceAdapter();
builder.withURI(new URI(getUrlString("/wallet")));
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withWalletServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.wallet.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.wallet.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new WalletEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
@ -89,55 +59,23 @@ public class FistfulClientTest extends AbstractTest {
public void testIdentityServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/identity";
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.identity.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.identity.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.identity.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createIdentityEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.identity.EventSinkSrv.Iface.class,
new IdentityEventSinkSrv(semaphore), null);
addServlet(srv, path);
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/identity");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withIdentityServiceAdapter();
builder.withURI(new URI(getUrlString("/identity")));
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withIdentityServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.identity.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.identity.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new IdentityEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
@ -149,55 +87,23 @@ public class FistfulClientTest extends AbstractTest {
public void testWithdrawalServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/withdrawal";
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.withdrawal.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createWithdrawalEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface.class,
new WithdrawalEventSinkSrv(semaphore), null);
addServlet(srv, path);
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/withdrawal");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withWithdrawalServiceAdapter();
builder.withURI(new URI(getUrlString("/withdrawal")));
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withWithdrawalServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.withdrawal.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.withdrawal.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new WithdrawalEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
@ -209,55 +115,23 @@ public class FistfulClientTest extends AbstractTest {
public void testDepositServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/deposit";
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.deposit.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.deposit.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createDepositEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.deposit.EventSinkSrv.Iface.class,
new DepositEventSinkSrv(semaphore), null);
addServlet(srv, path);
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/deposit");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withDepositServiceAdapter();
builder.withURI(new URI(getUrlString("/deposit")));
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withDepositServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.deposit.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.deposit.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new DepositEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
@ -269,55 +143,23 @@ public class FistfulClientTest extends AbstractTest {
public void testSourceServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/source";
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.source.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.source.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createSourceEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.source.EventSinkSrv.Iface.class,
new SourceEventSinkSrv(semaphore), null);
addServlet(srv, path);
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, "/source");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withSourceServiceAdapter();
builder.withURI(new URI(getUrlString("/source")));
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withSourceServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.source.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.source.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new SourceEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);
@ -329,55 +171,53 @@ public class FistfulClientTest extends AbstractTest {
public void testDestinationServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/destination";
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class, new com.rbkmoney.fistful.destination.EventSinkSrv.Iface() {
@Override
public List<com.rbkmoney.fistful.destination.SinkEvent> getEvents(EventRange range) throws TException {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> FistfulEventGenerator.createDestinationEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
DestinationEventSinkSrv destinationEventSinkSrv = new DestinationEventSinkSrv(semaphore);
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.destination.EventSinkSrv.Iface.class,
destinationEventSinkSrv, null);
@Override
public long getLastEventID() throws NoLastEvent, TException {
return 0;
}
}, null);
addServlet(srv, path);
addServlet(srv, "/destination");
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder().withDestinationServiceAdapter();
builder.withURI(new URI(getUrlString("/destination")));
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withDestinationServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new EventFilter<com.rbkmoney.fistful.destination.SinkEvent>() {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new DestinationEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
@Override
public Filter getFilter() {
return null;
}
publisher.subscribe(config);
semaphore.acquire(1);
Assert.assertEquals(2, lastId.get());
publisher.destroy();
}
@Override
public int getLimit() {
return 1;
}
@Test
public void testWithdrawalSessionServiceAdapter() throws URISyntaxException, InterruptedException {
Semaphore semaphore = new Semaphore(-1);
AtomicLong lastId = new AtomicLong(-1);
String path = "/withdrawal_session";
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, com.rbkmoney.fistful.destination.SinkEvent o) {
return true;
}
}, (e, k) -> {
lastId.set(e.getId());
return EventAction.CONTINUE;
});
WithdrawalSessionEventSinkSrv iface = new WithdrawalSessionEventSinkSrv(semaphore);
Servlet srv = createThrftRPCService(com.rbkmoney.fistful.withdrawal_session.EventSinkSrv.Iface.class,
iface, null);
addServlet(srv, path);
FistfulPollingEventPublisherBuilder builder = new FistfulPollingEventPublisherBuilder();
builder.withWithdrawalSessionServiceAdapter();
builder.withURI(new URI(getUrlString(path)));
PollingEventPublisher publisher = builder.build();
DefaultSubscriberConfig config = new DefaultSubscriberConfig<>(new WithdrawalSessionEventFilter(),
(e, k) ->
{
lastId.set(e.getId());
return EventAction.CONTINUE;
});
publisher.subscribe(config);
semaphore.acquire(1);

View File

@ -1,147 +0,0 @@
package com.rbkmoney.eventstock.client.poll;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
public class FistfulEventGenerator {
public static com.rbkmoney.fistful.withdrawal.SinkEvent createWithdrawalEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.withdrawal.SinkEvent sinkEvent = new com.rbkmoney.fistful.withdrawal.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.withdrawal.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.withdrawal.Change.created(new com.rbkmoney.fistful.withdrawal.Withdrawal())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.withdrawal.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.identity.SinkEvent createIdentityEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.identity.SinkEvent sinkEvent = new com.rbkmoney.fistful.identity.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.identity.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.identity.Change.created(new com.rbkmoney.fistful.identity.Identity())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.identity.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.wallet.SinkEvent createWalletEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.wallet.SinkEvent sinkEvent = new com.rbkmoney.fistful.wallet.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.wallet.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.wallet.Change.created(new com.rbkmoney.fistful.wallet.Wallet())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.wallet.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.deposit.SinkEvent createDepositEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.deposit.SinkEvent sinkEvent = new com.rbkmoney.fistful.deposit.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.deposit.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.deposit.Change.created(new com.rbkmoney.fistful.deposit.Deposit())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.deposit.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.source.SinkEvent createSourceEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.source.SinkEvent sinkEvent = new com.rbkmoney.fistful.source.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.source.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.source.Change.created(new com.rbkmoney.fistful.source.Source())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.source.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
public static com.rbkmoney.fistful.destination.SinkEvent createDestinationEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
com.rbkmoney.fistful.destination.SinkEvent sinkEvent = new com.rbkmoney.fistful.destination.SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(
new com.rbkmoney.fistful.destination.Event(
1,
timeString,
Arrays.asList(
com.rbkmoney.fistful.destination.Change.destination(new com.rbkmoney.fistful.destination.Destination())
)
)
);
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, new TBaseHandler<>(com.rbkmoney.fistful.destination.SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.deposit;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.deposit.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class DepositEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.eventstock.client.poll.additional.deposit;
import com.rbkmoney.fistful.deposit.SinkEvent;
import com.rbkmoney.fistful.deposit.Event;
import com.rbkmoney.fistful.deposit.Change;
import com.rbkmoney.fistful.deposit.Deposit;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DepositEventSinkSrv implements com.rbkmoney.fistful.deposit.EventSinkSrv.Iface {
private Semaphore semaphore;
public DepositEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createDepositEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createDepositEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Deposit()))));
try {
TBaseHandler<SinkEvent> handler = new TBaseHandler<>(SinkEvent.class);
return new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, handler);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.destination;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.destination.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class DestinationEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.eventstock.client.poll.additional.destination;
import com.rbkmoney.fistful.destination.Destination;
import com.rbkmoney.fistful.destination.Event;
import com.rbkmoney.fistful.destination.SinkEvent;
import com.rbkmoney.fistful.destination.Change;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DestinationEventSinkSrv implements com.rbkmoney.fistful.destination.EventSinkSrv.Iface{
private Semaphore semaphore;
public DestinationEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createDestinationEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createDestinationEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Destination()))));
try {
TBaseHandler<SinkEvent> handler = new TBaseHandler<>(SinkEvent.class);
return new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, handler);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.identity;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.identity.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class IdentityEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.eventstock.client.poll.additional.identity;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.identity.SinkEvent;
import com.rbkmoney.fistful.identity.Event;
import com.rbkmoney.fistful.identity.Change;
import com.rbkmoney.fistful.identity.Identity;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class IdentityEventSinkSrv implements com.rbkmoney.fistful.identity.EventSinkSrv.Iface {
private Semaphore semaphore;
public IdentityEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createIdentityEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createIdentityEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Identity()))));
try {
TBaseHandler<SinkEvent> handler = new TBaseHandler<>(SinkEvent.class);
return new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, handler);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.source;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.source.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class SourceEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.eventstock.client.poll.additional.source;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.source.SinkEvent;
import com.rbkmoney.fistful.source.Event;
import com.rbkmoney.fistful.source.Change;
import com.rbkmoney.fistful.source.Source;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SourceEventSinkSrv implements com.rbkmoney.fistful.source.EventSinkSrv.Iface {
private Semaphore semaphore;
public SourceEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createSourceEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createSourceEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Source()))));
try {
TBaseHandler<SinkEvent> handler = new TBaseHandler<>(SinkEvent.class);
return new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, handler);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.wallet;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.wallet.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class WalletEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.eventstock.client.poll.additional.wallet;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.wallet.SinkEvent;
import com.rbkmoney.fistful.wallet.Event;
import com.rbkmoney.fistful.wallet.Change;
import com.rbkmoney.fistful.wallet.Wallet;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class WalletEventSinkSrv implements com.rbkmoney.fistful.wallet.EventSinkSrv.Iface {
private Semaphore semaphore;
public WalletEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createWalletEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createWalletEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Wallet()))));
try {
TBaseHandler<SinkEvent> handler = new TBaseHandler<>(SinkEvent.class);
return new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, handler);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.withdrawal;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.withdrawal.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class WithdrawalEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.eventstock.client.poll.additional.withdrawal;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.withdrawal.SinkEvent;
import com.rbkmoney.fistful.withdrawal.Event;
import com.rbkmoney.fistful.withdrawal.Change;
import com.rbkmoney.fistful.withdrawal.Withdrawal;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class WithdrawalEventSinkSrv implements com.rbkmoney.fistful.withdrawal.EventSinkSrv.Iface {
private Semaphore semaphore;
public WithdrawalEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createWithdrawalEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createWithdrawalEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Withdrawal()))));
try {
TBaseHandler<SinkEvent> handler = new TBaseHandler<>(SinkEvent.class);
return new MockTBaseProcessor(MockMode.REQUIRED_ONLY).process(sinkEvent, handler);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.eventstock.client.poll.additional.withdrawal_session;
import com.rbkmoney.eventstock.client.EventConstraint;
import com.rbkmoney.eventstock.client.EventFilter;
import com.rbkmoney.fistful.withdrawal_session.SinkEvent;
import com.rbkmoney.geck.filter.Filter;
import java.time.temporal.TemporalAccessor;
public class WithdrawalSessionEventFilter implements EventFilter<SinkEvent> {
@Override
public EventConstraint getEventConstraint() {
EventConstraint.EventIDRange range = new EventConstraint.EventIDRange();
range.setFromNow();
return new EventConstraint(range);
}
@Override
public Filter getFilter() {
return null;
}
@Override
public int getLimit() {
return 1;
}
@Override
public boolean accept(Long eventId, TemporalAccessor createdAt, SinkEvent o) {
return true;
}
}

View File

@ -0,0 +1,60 @@
package com.rbkmoney.eventstock.client.poll.additional.withdrawal_session;
import com.rbkmoney.fistful.eventsink.EventRange;
import com.rbkmoney.fistful.withdrawal_session.Event;
import com.rbkmoney.fistful.withdrawal_session.SinkEvent;
import com.rbkmoney.fistful.withdrawal_session.Session;
import com.rbkmoney.fistful.withdrawal_session.Change;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.serializer.kit.mock.MockMode;
import com.rbkmoney.geck.serializer.kit.mock.MockTBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class WithdrawalSessionEventSinkSrv implements com.rbkmoney.fistful.withdrawal_session.EventSinkSrv.Iface {
private Semaphore semaphore;
public WithdrawalSessionEventSinkSrv(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public List<SinkEvent> getEvents(EventRange range) {
if (range.getAfter() == -1) {
return IntStream.range(0, 3).mapToObj(i -> createWithdrawalSessionEvent(i)).collect(Collectors.toList());
} else {
semaphore.release(1);
return Collections.emptyList();
}
}
public static SinkEvent createWithdrawalSessionEvent(long id) {
String timeString = TypeUtil.temporalToString(Instant.now());
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setId(id);
sinkEvent.setCreatedAt(timeString);
sinkEvent.setPayload(new Event(1, timeString, Arrays.asList(Change.created(new Session()))));
try {
sinkEvent = new MockTBaseProcessor(MockMode.REQUIRED_ONLY)
.process(sinkEvent, new TBaseHandler<>(SinkEvent.class));
} catch (IOException e) {
throw new RuntimeException(e);
}
return sinkEvent;
}
@Override
public long getLastEventID() {
return 0;
}
}

View File

@ -12,7 +12,7 @@
<groupId>com.rbkmoney</groupId>
<artifactId>eventstock-client</artifactId>
<version>1.2.4</version>
<version>1.2.5</version>
<packaging>pom</packaging>
<properties>