MB-20: Demo version of BM client library (#1)

Only for demo!
This commit is contained in:
Vladimir Pankrashkin 2016-06-29 17:03:09 +03:00 committed by GitHub
parent 561763af12
commit a933c40bea
31 changed files with 2373 additions and 0 deletions

56
.gitignore vendored Normal file
View File

@ -0,0 +1,56 @@
# Created by .ignore support plugin (hsz.mobi)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm
*.DS_Store
*.iml
## Directory-based project format:
.idea/
# if you remove the above rule, at least ignore the following:
# User-specific stuff:
# .idea/workspace.xml
# .idea/tasks.xml
# .idea/dictionaries
# Sensitive or high-churn files:
# .idea/dataSources.ids
# .idea/dataSources.xml
# .idea/sqlDataSources.xml
# .idea/dynamic.xml
# .idea/uiDesigner.xml
# Gradle:
# .idea/gradle.xml
# .idea/libraries
# Mongo Explorer plugin:
# .idea/mongoSettings.xml
## File-based project format:
*.ipr
*.iws
## Plugin-specific files:
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
# Target folder
target
# Target ant folder
build

91
pom.xml Normal file
View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>parent</artifactId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.rbkmoney</groupId>
<artifactId>bustermaze-client</artifactId>
<version>1.0.0</version>
<dependencies>
<!--RBK libs-->
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3-2</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>thrift-filter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--Thirdparty libs-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<!--Test libs-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-quickstart</artifactId>
<version>9.3.9.M1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.thrift</groupId>
<artifactId>thrift-maven-plugin</artifactId>
<version>0.9.3-1</version>
<configuration>
<thriftExecutable>/usr/local/bin/thrift</thriftExecutable>
<generator>java:fullcamel</generator>
</configuration>
<executions>
<execution>
<id>thrift-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>thrift-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,9 @@
package com.rbkmoney.bmclient;
/**
* Created by vpankrashkin on 28.06.16.
*/
public enum ErrorActionType {
SKIP,
INTERRUPT;
}

View File

@ -0,0 +1,8 @@
package com.rbkmoney.bmclient;
/**
* Created by vpankrashkin on 28.06.16.
*/
public interface ErrorHandler {
ErrorActionType handleError(Object source, Throwable errCause);
}

View File

@ -0,0 +1,8 @@
package com.rbkmoney.bmclient;
/**
* Created by vpankrashkin on 28.06.16.
*/
public interface EventFilter<TEvent> {
boolean accept(TEvent event);
}

View File

@ -0,0 +1,9 @@
package com.rbkmoney.bmclient;
/**
* Created by vpankrashkin on 28.06.16.
*/
public interface EventHandler<EType> {
void handleEvent(EType event, String subsKey);
void handleNoMoreElements(String subsKey);
}

View File

@ -0,0 +1,13 @@
package com.rbkmoney.bmclient;
/**
* Created by vpankrashkin on 28.06.16.
*/
public interface EventPublisher<TEvent> {
String subscribe(EventFilter<TEvent> filter);
String subscribe(EventFilter<TEvent> filter, EventHandler<TEvent> eventHandler);
String subscribe(EventFilter<TEvent> filter, EventHandler<TEvent> eventHandler, ErrorHandler errorHandler);
boolean unsubscribe(String subsKey);
void unsubscribeAll();
void destroy();
}

View File

@ -0,0 +1,100 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.ErrorHandler;
import com.rbkmoney.bmclient.EventFilter;
import com.rbkmoney.bmclient.EventHandler;
import com.rbkmoney.bmclient.EventPublisher;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by vpankrashkin on 28.06.16.
*/
public class AbstractPollingEventPublisher<TEvent> implements EventPublisher<TEvent> {
private final HashMap<String, SubscriberInfo> subscribers = new HashMap<>();
private final Lock lock = new ReentrantLock();
private final EventHandler<TEvent> defaultEventHandler;
private final ErrorHandler defaultErrorHandler;
private final PollingRunner pollingRunner;
private final int defaultBlockSize = 2;
public AbstractPollingEventPublisher(EventHandler<TEvent> defaultEventHandler, ErrorHandler defaultErrorHandler, PollingRunner pollingRunner) {
this.defaultEventHandler = defaultEventHandler;
this.defaultErrorHandler = defaultErrorHandler;
this.pollingRunner = pollingRunner;
}
@Override
public String subscribe(EventFilter<TEvent> filter) {
return subscribe(filter, defaultEventHandler);
}
@Override
public String subscribe(EventFilter<TEvent> filter, EventHandler<TEvent> eventHandler) {
return subscribe(filter, eventHandler, defaultErrorHandler);
}
@Override
public String subscribe(EventFilter<TEvent> filter, EventHandler<TEvent> eventHandler, ErrorHandler errorHandler) {
SubscriberInfo subscriberInfo = new SubscriberInfo(filter, eventHandler, errorHandler);
String subsKey;
do {
subsKey = UUID.randomUUID().toString();
lock.lock();
try {
if (subscribers.containsKey(subsKey)) {
continue;
} else {
subscribers.put(subsKey, subscriberInfo);
pollingRunner.addPolling(subsKey, subscriberInfo, defaultBlockSize);
break;
}
} finally {
lock.unlock();
}
} while (true);
return subsKey;
}
@Override
public boolean unsubscribe(String subsKey) {
lock.lock();
try {
subscribers.remove(subsKey);
return pollingRunner.removePolling(subsKey);
} finally {
lock.unlock();
}
}
@Override
public void unsubscribeAll() {
lock.lock();
try {
for (Iterator<String> it = subscribers.keySet().iterator(); it.hasNext();) {
String subsKey = it.next();
it.remove();
pollingRunner.removePolling(subsKey);
}
} finally {
lock.unlock();
}
}
@Override
public void destroy() {
lock.lock();
try {
unsubscribeAll();
pollingRunner.destroy();
} finally {
lock.unlock();
}
}
}

View File

@ -0,0 +1,39 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.EventFilter;
import com.rbkmoney.damsel.event_stock.EventRange;
import com.rbkmoney.damsel.event_stock.StockEvent;
import com.rbkmoney.thrift.filter.Filter;
/**
* Created by vpankrashkin on 28.06.16.
*/
public class BMEventFilter implements EventFilter<StockEvent> {
private final EventRange eventRange;
private final Filter filter;
public BMEventFilter(EventRange eventRange) {
this(eventRange, null);
}
public BMEventFilter(EventRange eventRange, Filter filter) {
this.eventRange = eventRange;
this.filter = filter;
}
@Override
public boolean accept(StockEvent stockEvent) {
if (filter != null) {
return filter.match(stockEvent.getSourceEvent().getProcessingEvent());
}
return true;
}
public EventRange getEventRange() {
return eventRange;
}
public Filter getFilter() {
return filter;
}
}

View File

@ -0,0 +1,66 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.ErrorActionType;
import com.rbkmoney.bmclient.ErrorHandler;
import com.rbkmoney.bmclient.EventHandler;
import com.rbkmoney.damsel.event_stock.EventRepositorySrv;
import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
/**
* Created by vpankrashkin on 29.06.16.
*/
public class BMEventPublisherBuilder {
private static final EventHandler DEFAULT_EVENT_HANDLER = new EventHandler() {
@Override
public void handleEvent(Object event, String subsKey) {
}
@Override
public void handleNoMoreElements(String subsKey) {
}
};
private static final ErrorHandler DEFALULT_ERROR_HANDLER = new ErrorHandler() {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public ErrorActionType handleError(Object source, Throwable errCause) {
log.error("Error", errCause);
return ErrorActionType.INTERRUPT;
}
};
private URI uri;
private EventHandler eventHandler;
private ErrorHandler errorHandler;
public BMEventPublisherBuilder withURI(URI uri) {
this.uri = uri;
return this;
}
public BMEventPublisherBuilder withEventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
return this;
}
public BMEventPublisherBuilder withErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
return this;
}
public BMPollingEventPublisher build() {
THSpawnClientBuilder clientBuilder = new THSpawnClientBuilder();
clientBuilder.withAddress(uri);
BMServiceAdapter serviceAdapter = new BMServiceAdapter(clientBuilder.build(EventRepositorySrv.Iface.class));
PollingRunner pollingRunner = new BMPollingRunner(serviceAdapter);
BMPollingEventPublisher eventPublisher = new BMPollingEventPublisher(eventHandler == null ? DEFAULT_EVENT_HANDLER : eventHandler, errorHandler == null ? DEFALULT_ERROR_HANDLER : errorHandler, pollingRunner);
return eventPublisher;
}
}

View File

@ -0,0 +1,14 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.ErrorHandler;
import com.rbkmoney.bmclient.EventHandler;
import com.rbkmoney.damsel.event_stock.StockEvent;
/**
* Created by vpankrashkin on 29.06.16.
*/
public class BMPollingEventPublisher extends AbstractPollingEventPublisher<StockEvent> {
public BMPollingEventPublisher(EventHandler<StockEvent> defaultEventHandler, ErrorHandler defaultErrorHandler, PollingRunner pollingRunner) {
super(defaultEventHandler, defaultErrorHandler, pollingRunner);
}
}

View File

@ -0,0 +1,153 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.EventHandler;
import com.rbkmoney.damsel.event_stock.DatasetTooBig;
import com.rbkmoney.damsel.event_stock.EventRange;
import com.rbkmoney.damsel.event_stock.StockEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Created by vpankrashkin on 28.06.16.
*/
public class BMPollingRunner implements PollingRunner {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ServiceAdapter<StockEvent, EventRange> serviceAdapter;
private ExecutorService executor = Executors.newCachedThreadPool();
private Map<String, BMPollingWorker> pollers = new HashMap<>();
public BMPollingRunner(ServiceAdapter<StockEvent, EventRange> serviceAdapter) {
this.serviceAdapter = serviceAdapter;
}
@Override
public boolean addPolling(String subsKey, SubscriberInfo subscriberInfo, int maxBlockSize) {
if (pollers.containsKey(subsKey)) {
return false;
}
BMPollingWorker pollingWorker = new BMPollingWorker(serviceAdapter, subscriberInfo, subsKey, maxBlockSize);
executor.submit(pollingWorker);
pollers.put(subsKey, pollingWorker);
return true;
}
@Override
public boolean removePolling(String subsKey) {
BMPollingWorker pollingWorker = pollers.get(subsKey);
if (pollingWorker != null) {
pollingWorker.stop();
//TODO waitnig for worker destruction
}
return true;
}
@Override
public void destroy() {
executor.shutdownNow();
try {
if (executor.awaitTermination(10, TimeUnit.SECONDS)) {
log.error("Failed to stop executor in time");
}
} catch (InterruptedException e) {
log.info("Awaiting for executor shutdown is interrupted");
}
}
private static class BMPollingWorker implements Runnable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ServiceAdapter<StockEvent, EventRange> serviceAdapter;
private final SubscriberInfo subscriberInfo;
private final String subsKey;
private final int blockSize;
private final BMRangeChecker rangeChecker = new BMRangeChecker();
private volatile boolean runFalg = true;
public BMPollingWorker(ServiceAdapter<StockEvent, EventRange> serviceAdapter, SubscriberInfo subscriberInfo, String subsKey, int blockSize) {
this.serviceAdapter = serviceAdapter;
this.subscriberInfo = subscriberInfo;
this.subsKey = subsKey;
this.blockSize = blockSize;
}
@Override
public void run() {
EventRange eventRange = ((BMEventFilter) subscriberInfo.getEventFilter()).getEventRange();
EventRange currentRange = eventRange;
int currentBlockSize = blockSize;
boolean notExhausted = true;
mailLoop:
while (runFalg && !Thread.currentThread().isInterrupted() && notExhausted) {
Collection<StockEvent> events = null;
try {
events = serviceAdapter.getEventRange(currentRange, currentBlockSize);
EventHandler eventHandler = subscriberInfo.getEventHandler();
StockEvent lastEvent = null;
for (StockEvent event: events) {
if (!runFalg) {
continue mailLoop;
}
if (!((BMEventFilter) subscriberInfo.getEventFilter()).accept(event)) {
log.trace("Event skipped for subscription:{}, event: {}", subsKey, event);
continue;
}
try {
lastEvent = event;
eventHandler.handleEvent(event, subsKey);
} catch (Exception e) {
log.error("Event handled with error for subscription:"+subsKey+ "event:"+event, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
if (events.size() < currentBlockSize) {
log.debug("Subscription exhausted {}", subsKey);
notExhausted = false;
subscriberInfo.getEventHandler().handleNoMoreElements(subsKey);
} else if (lastEvent != null) {
currentRange = increaseIdRange(currentRange, currentBlockSize);
}
} catch (ServiceException e) {
if (e.getCause() instanceof DatasetTooBig) {
DatasetTooBig dtbEx = (DatasetTooBig) e.getCause();
log.info("Current data block size: '{}' is too big, new size is: '{}'", currentBlockSize, dtbEx.getLimit());
currentBlockSize = dtbEx.getLimit();
}
log.error("Failed to execute request to repository service", e);
}
}
}
private EventRange increaseIdRange(EventRange oldRange, int idStep) {
long fromId = (Long) oldRange.getIdRange().getFromId().getFieldValue();
long toId = (Long) oldRange.getIdRange().getToId().getFieldValue();
EventRange newRange = new EventRange(oldRange);
long newToId = fromId + idStep;
newToId = newToId > toId ? toId : newToId;
if (oldRange.getIdRange().getFromId().isSetInclusive()) {
newRange.getIdRange().getFromId().setInclusive(newToId);
} else {
newRange.getIdRange().getFromId().setExclusive(newToId);
}
return newRange;
}
public void stop() {
runFalg = false;
}
}
}

View File

@ -0,0 +1,25 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.damsel.event_stock.EventRange;
import com.rbkmoney.damsel.event_stock.StockEvent;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.IsoFields;
import java.time.temporal.TemporalField;
/**
* Created by vpankrashkin on 28.06.16.
*/
public class BMRangeChecker {
private final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_INSTANT;
public boolean isInRange(StockEvent event, EventRange range) {
long id = event.getSourceEvent().getProcessingEvent().getId();
boolean result = range.getIdRange().getFromId().isSetInclusive() ? id >= range.getIdRange().getFromId().getInclusive() : id > range.getIdRange().getFromId().getExclusive();
result &= range.getIdRange().getToId().isSetInclusive() ? id <= range.getIdRange().getToId().getInclusive() : id < range.getIdRange().getToId().getExclusive();
return result;
}
}

View File

@ -0,0 +1,30 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.damsel.event_stock.EventConstraint;
import com.rbkmoney.damsel.event_stock.EventRange;
import com.rbkmoney.damsel.event_stock.EventRepositorySrv;
import com.rbkmoney.damsel.event_stock.StockEvent;
import org.apache.thrift.TException;
import java.util.Collection;
/**
* Created by vpankrashkin on 29.06.16.
*/
public class BMServiceAdapter implements ServiceAdapter<StockEvent, EventRange> {
private final EventRepositorySrv.Iface repository;
public BMServiceAdapter(EventRepositorySrv.Iface repository) {
this.repository = repository;
}
@Override
public Collection<StockEvent> getEventRange(EventRange range, int limit) throws ServiceException {
EventConstraint eventConstraint = new EventConstraint(range, limit);
try {
return repository.getEvents(eventConstraint);
} catch (TException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.bmclient.polling;
/**
* Created by vpankrashkin on 28.06.16.
*/
public interface PollingRunner {
boolean addPolling(String subsKey, SubscriberInfo subscriberInfo, int maxBlockSize);
boolean removePolling(String subsKey);
void destroy();
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.bmclient.polling;
import java.util.Collection;
/**
* Created by vpankrashkin on 28.06.16.
*/
public interface ServiceAdapter<TEvent, TRange> {
Collection<TEvent> getEventRange(TRange range, int limit) throws ServiceException;
}

View File

@ -0,0 +1,26 @@
package com.rbkmoney.bmclient.polling;
/**
* Created by vpankrashkin on 28.06.16.
*/
public class ServiceException extends Exception {
public ServiceException() {
super();
}
public ServiceException(String message) {
super(message);
}
public ServiceException(String message, Throwable cause) {
super(message, cause);
}
public ServiceException(Throwable cause) {
super(cause);
}
protected ServiceException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,32 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.ErrorHandler;
import com.rbkmoney.bmclient.EventFilter;
import com.rbkmoney.bmclient.EventHandler;
/**
* Created by vpankrashkin on 28.06.16.
*/
class SubscriberInfo {
private final EventFilter eventFilter;
private final EventHandler eventHandler;
private final ErrorHandler errorHandler;
public SubscriberInfo(EventFilter eventFilter, EventHandler eventHandler, ErrorHandler errorHandler) {
this.eventFilter = eventFilter;
this.eventHandler = eventHandler;
this.errorHandler = errorHandler;
}
public EventFilter getEventFilter() {
return eventFilter;
}
public EventHandler getEventHandler() {
return eventHandler;
}
public ErrorHandler getErrorHandler() {
return errorHandler;
}
}

View File

@ -0,0 +1,84 @@
/*
* Базовые, наиболее общие определения
*/
namespace java com.rbkmoney.damsel.base
/** Идентификатор */
typedef string ID
/** Идентификатор некоторого события */
typedef i64 EventID
/** Непрозрачный для участника общения набор данных */
typedef binary Opaque
/**
* Отметка во времени согласно ISO 8601.
*
* Строка должна содержать дату и время в UTC в следующем формате:
* `2016-03-22T06:12:27Z`.
*/
typedef string Timestamp
/** Отображение из строки в строку */
typedef map<string, string> StringMap
/** Рациональное число. */
struct Rational {
1: required i64 p
2: required i64 q
}
/** Отрезок времени в секундах */
typedef i32 Timeout
/** Значение ассоциации */
typedef string Tag
/** Критерий остановки таймера */
union Timer {
/** Отрезок времени, после истечения которого таймер остановится */
1: Timeout timeout
/** Отметка во времени, при пересечении которой таймер остановится */
2: Timestamp deadline
}
/** Общий успех */
struct Ok {
}
/** Общая ошибка */
struct Error {
/** Уникальный признак ошибки, пригодный для обработки машиной */
1: required string code
/** Описание ошибки, пригодное для восприятия человеком */
2: optional string description
}
/** Общее исключение */
exception Failure {
/** Ошибка, которая привела к возникновению исключения */
//Нельзя назвать поле `error` из-за особенностей генерации thrift Go - приводит к одинаковым именам для функции и поля структуры
1: required Error e
}
/**
* Исключение, сигнализирующее о возникновение транзиентной проблемы, которая с высокой
* вероятностью не повторится при последующих попытках
*/
exception TryLater {
/** Транзиентная ошибка, которая привела к возникновению исключения */
1: required Error e
}
/**
* Исключение, сигнализирующее о непригодных с точки зрения бизнес-логики входных данных
*/
exception InvalidRequest {
/** Список пригодных для восприятия человеком ошибок во входных данных */
1: required list<string> errors
}
/** Исключение, сигнализирующее об отсутствии объекта или процесса */
exception NotFound {}

View File

@ -0,0 +1,78 @@
include "base.thrift"
include "domain.thrift"
namespace java com.rbkmoney.damsel.cds
/** Часть мастер-ключа */
typedef binary MasterKeyShare;
typedef list<MasterKeyShare> MasterKeyShares;
/** Дата экспирации */
struct ExpDate {
/** Месяц 1..12 */
1: required i8 month
/** Год 2015..∞ */
2: required i16 year
}
/** Открытые карточные данные (в отличие от domain.BankCard) */
struct CardData {
/** Номер карточки без пробелов [0-9]{14,19} */
1: required string pan
2: required ExpDate exp_date
/** Имя держателя */
3: required string cardholder_name
/** Код верификации [0-9]{3,4} */
4: required string cvv
}
struct PutCardDataResult {
1: required domain.BankCard bank_card
2: required domain.PaymentSession session
}
union UnlockStatus {
1: base.Ok ok
/** сколько частей ключа нужно еще ввести, чтобы расшифровать хранилище */
2: i16 more_keys_needed
}
exception InvalidCardData {}
exception NoKeyring {}
exception KeyringLocked {}
exception KeyringExists {}
/** Интерфейс для администраторов */
service Keyring {
/** Создать новый кейринг
* threshold - минимально необходимое количество ключей для восстановления мастер ключа
* num_shares - общее количество частей, на которое нужно разбить мастер-ключ
*/
MasterKeyShares Init (1: i16 threshold, 2: i16 num_shares) throws (1: KeyringExists exists)
/** Предоставить часть мастер-ключа для расшифровки кейринга.
* Необходимо вызвать с разными частами мастер столько раз, сколько было указано в качестве
* параметра threshold при создании кейринга
*/
UnlockStatus Unlock (1: MasterKeyShare key_share) throws (1: NoKeyring no_keyring)
/** Зашифровать кейринг */
void Lock () throws ()
/** Добавить новый ключ в кейринг */
void Rotate () throws (1: KeyringLocked locked)
}
/** Интерфейс для приложений */
service Storage {
/** Получить карточные данные без CVV */
CardData GetCardData (1: domain.Token token)
throws (1: base.NotFound not_found, 2: KeyringLocked locked)
/** Получить карточные данные c CVV */
CardData GetSessionCardData (1: domain.Token token, 2: domain.PaymentSession session)
throws (1: base.NotFound not_found, 2: KeyringLocked locked)
/** Сохранить карточные данные */
PutCardDataResult PutCardData (1: CardData card_data)
throws (1: InvalidCardData invalid, 2: KeyringLocked locked)
}

View File

@ -0,0 +1,377 @@
/**
* Определения предметной области.
*/
include "base.thrift"
namespace java com.rbkmoney.damsel.domain
namespace erlang domain
typedef i32 SchemaRevision
typedef i64 DataRevision
const SchemaRevision SCHEMA_REVISION = 42
typedef i32 ObjectID
/* Common */
// В идеале надо использовать `typedef` над `base.Error`, но сейчас это приводит к ошибкам кодогенератора Go
struct OperationError {
/** Уникальный признак ошибки, пригодный для обработки машиной */
1: required string code;
/** Описание ошибки, пригодное для восприятия человеком */
2: optional string description;
}
/** Сумма в минимальных денежных единицах. */
typedef i64 Amount
/** Валюта. */
struct Currency {
1: required string name
2: required string symbolic_code
3: required i16 numeric_code
4: required i16 exponent
}
struct CurrencyRef { 1: required string symbolic_code }
struct CurrencyObject {
1: required CurrencyRef ref
2: required Currency data
}
/** Денежные средства, состоящий из суммы и валюты. */
struct Funds {
1: required Amount amount
2: required Currency currency
}
/* Contractor transactions */
struct TransactionInfo {
1: required string id
2: optional base.Timestamp timestamp
3: required base.StringMap extra = []
}
/* Invoices */
typedef base.ID InvoiceID
typedef base.ID InvoicePaymentID
typedef binary InvoiceContext
typedef string PaymentSession
struct Invoice {
1: required InvoiceID id
2: required base.Timestamp created_at
3: required DataRevision domain_revision
4: required InvoiceStatus status
5: required base.Timestamp due
6: required string product
7: optional string description
8: required Funds cost
9: required InvoiceContext context
}
struct InvoiceUnpaid {}
struct InvoicePaid {}
struct InvoiceCancelled { 1: required string details }
struct InvoiceFulfilled { 1: required string details }
union InvoiceStatus {
1: InvoiceUnpaid unpaid
2: InvoicePaid paid
3: InvoiceCancelled cancelled
4: InvoiceFulfilled fulfilled
}
struct InvoicePayment {
1: required InvoicePaymentID id
2: required base.Timestamp created_at
3: required InvoicePaymentStatus status
4: optional TransactionInfo trx
5: required Payer payer
6: required PaymentTool payment_tool
7: required PaymentSession session
}
struct InvoicePaymentPending {}
struct InvoicePaymentSucceeded {}
struct InvoicePaymentFailed { 1: OperationError err }
union InvoicePaymentStatus {
1: InvoicePaymentPending pending
2: InvoicePaymentSucceeded succeeded
3: InvoicePaymentFailed failed
}
struct Payer {}
/* Cash flows */
/** Распределение денежных потоков в системе. */
struct CashDistribution {
1: required string name
2: required string description = ""
3: required list<CashFlow> flows
}
/** Участник распределения денежных потоков. */
// Порядок следования `typedef`-`struct` важен для кодогенератора Go
typedef string CashFlowNode // FIXME: too broad
/** Денежный поток между двумя участниками. */
struct CashFlow {
1: required CashFlowNode source
2: required CashFlowNode destination
3: required CashVolume volume
}
/** Объём денежного потока. */
union CashVolume {
1: VolumeFixed fixed
2: VolumeShare share
}
/** Объём в абсолютных денежных единицах. */
struct VolumeFixed {
1: required Amount amount
}
/** Объём в относительных единицах. */
struct VolumeShare {
1: required base.Rational parts
2: optional CashFlowNode of
}
struct CashDistributionRef { 1: required ObjectID id }
struct CashDistributionObject {
1: required CashDistributionRef ref
2: required CashDistribution data
}
/* Merchants */
/** Мерчант. */
struct Merchant {
1: optional Contract contract
2: required list<Shop> shops = []
}
struct MerchantRef { 1: required base.ID id }
struct MerchantObject {
1: required MerchantRef ref
2: required Merchant data
}
/* Contracts */
/** Договор с юридическим лицом, в частности с мерчантом. */
struct Contract {
1: required string number
2: required base.Timestamp signed_at
3: required PartyRef party
4: required BankAccount account
5: required list<ContractTerm> terms
}
/** Лицо, выступающее стороной договора. */
struct Party {
1: required string registered_name
2: required LegalEntity legal_entity
}
/** Форма юридического лица. */
union LegalEntity {
}
struct PartyRef { 1: required ObjectID id }
struct PartyObject {
1: required PartyRef ref
2: required Party data
}
/** Банковский счёт. */
struct BankAccount {
}
/** Условие договора. */
union ContractTerm {
1: CashDistributionTerm cash_distribution
}
struct CashDistributionTerm {
}
/* Shops */
/** Магазин мерчанта. */
struct Shop {
1: required string name
2: optional string url
3: required Category category
}
/* Categories */
/** Категория продаваемых товаров или услуг. */
struct Category {
1: required string name
2: required string description = ""
}
struct CategoryRef { 1: required ObjectID id }
struct CategoryObject {
1: required CategoryRef ref
2: required Category data
}
/* Payment methods */
enum PaymentMethod {
bank_card = 1 // payment_card?
}
union PaymentTool {
1: BankCard bank_card
}
typedef string Token
struct BankCard {
1: required Token token
2: required BankCardPaymentSystem payment_system
3: required string bin
4: required string masked_pan
}
enum BankCardPaymentSystem {
visa
mastercard
}
/** Способ платежа, категория платёжного средства. */
struct PaymentMethodDefinition {
1: required string name
2: required string description = ""
}
struct PaymentMethodRef { 1: required PaymentMethod id }
struct PaymentMethodObject {
1: required PaymentMethodRef ref
2: required PaymentMethodDefinition data
}
/* Conditions */
/** Условие применимости. */
struct Condition {
1: required string name
2: required string description = ""
3: required ConditionDef definition
}
/** Варианты условий применимости. */
union ConditionDef {
/// basis and combinators
1: bool value_is
2: set<ConditionDef> all_of
3: set<ConditionDef> one_of
4: ConditionDef is_not
/// primitives
5: ConditionRef condition_is
6: CategoryRef category_is
7: PaymentMethodRef payment_method_is
8: FlowRef flow_is
}
struct ConditionRef { 1: required ObjectID id }
struct ConditionObject {
1: required ConditionRef ref
2: required Condition data
}
/* Flows */
/** Операция над бизнес-объектом, в частности инвойсом. */
struct Flow {
1: required string name
2: required string description = ""
}
struct FlowRef { 1: required ObjectID id }
struct FlowObject {
1: required FlowRef ref
2: required Flow data
}
/* Proxies */
typedef base.StringMap ProxyOptions
enum ProxyType {
provider
}
struct Proxy {
1: required ProxyType type
2: required string url
3: optional ProxyOptions options
}
struct ProxyRef { 1: required ObjectID id }
struct ProxyObject {
1: required ProxyRef ref
2: required ProxyObject object
}
/* Merchant prototype */
struct MerchantPrototypeRef {}
/** Прототип мерчанта по умолчанию. */
struct MerchantPrototype {
1: required MerchantPrototypeRef ref
2: required Merchant data
}
/* Type enumerations */
union Reference {
1: CategoryRef category
2: PaymentMethodRef payment_method
3: FlowRef flow
4: CurrencyRef currency
5: ConditionRef condition
6: CashDistributionRef cash_distribution
7: PartyRef party
8: MerchantPrototypeRef merchant_prototype
9: ProxyRef proxy
}
union DomainObject {
1: CategoryObject category
2: PaymentMethodObject payment_method
3: FlowObject flow
4: CurrencyObject currency
5: ConditionObject condition
6: CashDistributionObject cash_distribution
7: PartyObject party
8: MerchantPrototype merchant_prototype
9: ProxyObject proxy
}
/* Domain */
typedef map<Reference, DomainObject> Domain

View File

@ -0,0 +1,78 @@
/**
* Интерфейс и связанные с ним определения сервиса конфигурации предметной
* области (domain config).
*/
include "domain.thrift"
namespace java com.rbkmoney.damsel.domain_config
namespace erl domain
/**
* Маркер вершины истории.
*/
struct Head {}
/**
* Вариант представления ревизии в истории.
*/
union Revision {
1: domain.DataRevision rev;
2: Head head;
}
/**
* Монотонно возрастающая версия монолитного набора объектов, определённая
* точка в его истории набора объектов.
*/
struct Version {
1: required domain.SchemaRevision schema = domain.SCHEMA_REVISION;
2: required Revision data;
}
/**
* Возможные операции над набором объектов.
*/
union Operation {
1: InsertOp insert;
2: UpdateOp update;
3: RemoveOp remove;
}
struct InsertOp {
1: required domain.DomainObject object;
}
struct UpdateOp {
1: required domain.DomainObject object;
}
struct RemoveOp {
1: required domain.Reference ref;
}
exception VersionNotFound {}
exception ObjectNotFound {}
exception OperationConflict {}
/**
* Интерфейс сервиса конфигурации предметной области.
*/
service Configurator {
Version head ()
throws (1: VersionNotFound ex1);
Version pollHead ()
throws ();
domain.Domain checkout (1: Version v)
throws (1: VersionNotFound ex1);
domain.DomainObject checkoutObject (1: Version v, 2: domain.Reference ref)
throws (1: VersionNotFound ex1, 2: ObjectNotFound ex2);
Version commit (1: Version v, 2: Operation op)
throws (1: VersionNotFound ex1, 2: OperationConflict ex3);
}

View File

@ -0,0 +1,112 @@
include "base.thrift"
include "payment_processing.thrift"
namespace java com.rbkmoney.damsel.event_stock
namespace erlang event_stock
typedef list<StockEvent> StockEvents
typedef base.EventID EventID
typedef base.Timestamp Timestamp
typedef base.InvalidRequest InvalidRequest
typedef payment_processing.NoLastEvent NoLastEvent
/**
* Исходное событие, полученное из HG или другого источника.
*/
union SourceEvent {
1: payment_processing.Event processing_event
}
/**
* Событие, которое BM отдает клиентам.
* source_event - Исходное событие, к которому применяся фильтр.
*/
struct StockEvent {
1: required SourceEvent source_event
}
/**
* Граница диапазона EventId.
*/
union EventIDBound {
1: EventID inclusive
2: EventID exclusive
}
/**
* Диапазон идентификаторов событий.
* from_id - с какого ID.
* to_id - по какой ID.
* Если from > to - диапазон считается некорректным.
*/
struct EventIDRange {
1: required EventIDBound from_id
2: required EventIDBound to_id
}
/**
* Граница диапазона Timestamp.
*/
union EventTimeBound {
1: Timestamp inclusive;
2: Timestamp exclusive;
}
/**
* Диапазон времени создания событий.
* from_time - начальное время.
* to_time - конечное время.
* Если from > to - диапазон считается некорректным.
*/
struct EventTimeRange {
1: required EventTimeBound from_time;
2: required EventTimeBound to_time;
}
/**
* Диапазон событий по ID или времени создания.
* Если границы диапазона выборки указавают на не существующие значения, то в выборку попадут ближайшие удовлетворяющее условию значения.
*/
union EventRange {
1: EventIDRange id_range;
2: EventTimeRange time_range;
}
/**
* Ограничение выборки событий.
* event_range - диапазон выборки. Нужно явно задавать, какой диапазон ID или времени должен быть отфильтрован.
* limit - максимальный размер выборки, неотрицательное целое число.
*/
struct EventConstraint {
1: required EventRange event_range;
3: required i32 limit;
}
/**
* Ошибка превышения максимального размера блока данных, доступного для отправки клиенту.
* limit - текущий максимальный размер блока.
*/
exception DatasetTooBig {
1: i32 limit;
}
/**
* Интерфейс BM для клиентов.
*/
service EventRepository {
/**
* Возвращает события, удовлетворяющие переданному условию.
* Возвращаемый набор данных отсортирован по ID.
* Возвращает ошибку InvalidRequest, если диапазон фильтрации или лимит указан некорректно.
* Возвращает ошибку DatasetTooBig, если результирующий блок данных слишком большой.
*/
StockEvents getEvents(1: EventConstraint constraint) throws (1: InvalidRequest ex1, 2: DatasetTooBig ex2)
/**
* Возвращает наиболее позднее известное на момент исполнения запроса событие.
*/
StockEvent getLastEvent () throws (1: NoLastEvent ex1)
}

View File

@ -0,0 +1,276 @@
/**
* Определения и сервисы процессинга.
*/
include "base.thrift"
include "domain.thrift"
namespace java com.rbkmoney.damsel.payment_processing
namespace erlang payproc
/* Interface clients */
typedef base.ID UserID
struct UserInfo {
1: required UserID id
}
/* Invoices */
struct InvoiceState {
1: required domain.Invoice invoice
2: required list<domain.InvoicePayment> payments = []
}
/* Events */
/**
* Событие, атомарный фрагмент истории бизнес-объекта, например инвойса.
*/
struct Event {
/**
* Идентификатор события.
* Монотонно возрастающее целочисленное значение, таким образом на множестве
* событий задаётся отношение полного порядка (total order).
*/
1: required base.EventID id
/**
* Время создания события.
*/
2: required base.Timestamp created_at
/**
* Идентификатор бизнес-объекта, источника события.
*/
3: required EventSource source
/**
* Номер события в последовательности событий от указанного источника.
*
* Номер первого события от источника всегда равен `1`, то есть `sequence`
* принимает значения из диапазона `[1; 2^31)`
*/
4: required i32 sequence
/**
* Содержание события.
*/
5: required EventPayload payload
}
/**
* Источник события, идентификатор бизнес-объекта, который породил его в
* процессе выполнения определённого бизнес-процесса.
*/
union EventSource {
/** Идентификатор инвойса, который породил событие. */
1: domain.InvoiceID invoice
}
typedef list<Event> Events
/**
* Один из возможных вариантов содержания события.
*/
union EventPayload {
/** Некоторое событие, порождённое инвойсом. */
1: InvoiceEvent invoice_event
}
/**
* Один из возможных вариантов события, порождённого инвойсом.
*/
union InvoiceEvent {
1: InvoiceCreated invoice_created
2: InvoiceStatusChanged invoice_status_changed
3: InvoicePaymentEvent invoice_payment_event
}
/**
* Один из возможных вариантов события, порождённого платежом по инвойсу.
*/
union InvoicePaymentEvent {
1: InvoicePaymentStarted invoice_payment_started
2: InvoicePaymentBound invoice_payment_bound
3: InvoicePaymentSucceeded invoice_payment_succeeded
4: InvoicePaymentFailed invoice_payment_failed
}
/**
* Событие о создании нового инвойса.
*/
struct InvoiceCreated {
/** Данные созданного инвойса. */
1: required domain.Invoice invoice
}
/**
* Событие об изменении статуса инвойса.
*/
struct InvoiceStatusChanged {
/** Новый статус инвойса. */
1: required domain.InvoiceStatus status
/** Человекочитаемые данные, связанные с изменением статуса. */
2: optional string details
}
/**
* Событие об запуске платежа по инвойсу.
*/
struct InvoicePaymentStarted {
/** Данные запущенного платежа. */
1: required domain.InvoicePayment payment
}
/**
* Событие о том, что появилась связь между платежом по инвойсу и транзакцией
* у провайдера.
*/
struct InvoicePaymentBound {
/** Идентификатор платежа по инвойсу. */
1: required domain.InvoicePaymentID payment_id
/** Данные о связанной транзакции у провайдера. */
2: required domain.TransactionInfo trx
}
/**
* Событие об успешном прохождении платежа по инвойсу.
*/
struct InvoicePaymentSucceeded {
/** Идентификатор платежа по инвойсу. */
1: required domain.InvoicePaymentID payment_id
}
/**
* Событие о неуспешном завершении платежа по инвойсу.
*/
struct InvoicePaymentFailed {
/** Идентификатор платежа по инвойсу. */
1: required domain.InvoicePaymentID payment_id
/** Данные ошибки, явившейся причиной завершения платежа. */
2: required domain.OperationError error
}
/**
* Диапазон для выборки событий.
*/
struct EventRange {
/**
* Идентификатор события, за которым должны следовать попадающие в выборку
* события.
*
* Если `after` не указано, в выборку попадут события с начала истории; если
* указано, например, `42`, то в выборку попадут события, случившиеся _после_
* события `42`.
*/
1: optional base.EventID after
/**
* Максимальное количество событий в выборке.
*
* В выборку может попасть количество событий, е больше_ указанного в
* `limit`. Если в выборку попало событий еньше_, чем значение `limit`,
* был достигнут конец текущей истории.
*
* опустимые значения_: неотрицательные числа
*/
2: required i32 limit
}
/* Invoicing service definitions */
struct InvoiceParams {
1: required string product
2: optional string description
3: required base.Timestamp due
4: required domain.Amount amount
5: required domain.CurrencyRef currency
6: required domain.InvoiceContext context
}
struct InvoicePaymentParams {
1: required domain.Payer payer
2: required domain.PaymentTool payment_tool
3: required domain.PaymentSession session
}
exception InvalidUser {}
exception UserInvoiceNotFound {}
exception InvoicePaymentPending { 1: required domain.InvoicePaymentID id }
exception InvoicePaymentNotFound {}
exception EventNotFound {}
exception InvalidInvoiceStatus { 1: required domain.InvoiceStatus status }
service Invoicing {
domain.InvoiceID Create (1: UserInfo user, 2: InvoiceParams params)
throws (1: InvalidUser ex1, 2: base.InvalidRequest ex2)
InvoiceState Get (1: UserInfo user, 2: domain.InvoiceID id)
throws (1: InvalidUser ex1, 2: UserInvoiceNotFound ex2)
Events GetEvents (1: UserInfo user, 2: domain.InvoiceID id, 3: EventRange range)
throws (
1: InvalidUser ex1,
2: UserInvoiceNotFound ex2,
3: EventNotFound ex3,
4: base.InvalidRequest ex4
)
domain.InvoicePaymentID StartPayment (
1: UserInfo user,
2: domain.InvoiceID id,
3: InvoicePaymentParams params
)
throws (
1: InvalidUser ex1,
2: UserInvoiceNotFound ex2,
3: InvalidInvoiceStatus ex3,
4: InvoicePaymentPending ex4,
5: base.InvalidRequest ex5
)
domain.InvoicePayment GetPayment (1: UserInfo user, 2: domain.InvoicePaymentID id)
throws (1: InvalidUser ex1, 2: InvoicePaymentNotFound ex2)
void Fulfill (1: UserInfo user, 2: domain.InvoiceID id, 3: string reason)
throws (1: InvalidUser ex1, 2: UserInvoiceNotFound ex2, 3: InvalidInvoiceStatus ex3)
void Rescind (1: UserInfo user, 2: domain.InvoiceID id, 3: string reason)
throws (1: InvalidUser ex1, 2: UserInvoiceNotFound ex2, 3: InvalidInvoiceStatus ex3)
}
/* Event sink service definitions */
/** Исключение, сигнализирующее о том, что последнего события не существует. */
exception NoLastEvent {}
service EventSink {
/**
* Получить последовательный набор событий из истории системы, от более
* ранних к более поздним, из диапазона, заданного `range`. Результат
* выполнения запроса может содержать от `0` до `range.limit` событий.
*
* Если в `range.after` указан идентификатор неизвестного события, то есть
* события, не наблюдаемого клиентом ранее в известной ему истории,
* бросится исключение `EventNotFound`.
*/
Events GetEvents (1: EventRange range)
throws (1: EventNotFound ex1, 2: base.InvalidRequest ex2)
/**
* Получить идентификатор наиболее позднего известного на момент исполнения
* запроса события.
*/
base.EventID GetLastEventID ()
throws (1: NoLastEvent ex1)
}

View File

@ -0,0 +1,112 @@
include "base.thrift"
include "domain.thrift"
namespace java com.rbkmoney.damsel.proxy_provider
/**
* Непрозрачное для процессинга состояние прокси, связанное с определённой сессией взаимодействия
* с провайдером.
*/
typedef base.Opaque ProxyState
/**
* Требование прокси к процессингу, отражающее дальнейший прогресс сессии взаимодействия
* с провайдером.
*/
union Intent {
1: FinishIntent finish
2: SleepIntent sleep
}
/**
* Требование завершить сессию взаимодействия с провайдером.
*/
struct FinishIntent {
1: required FinishStatus status
}
/**
* Статус, c которым завершилась сессия взаимодействия с провайдером.
*/
union FinishStatus {
/** Успешное завершение взаимодействия. */
1: base.Ok ok
/** Неуспешное завершение взаимодействия с пояснением возникшей проблемы. */
2: base.Error failure
}
/**
* Требование прервать на определённое время сессию взаимодействия, с намерением продолжить
* её потом.
*/
struct SleepIntent {
/** Таймер, определяющий когда следует продолжить взаимодействие. */
1: required base.Timer timer
}
/**
* Данные платежа, необходимые для обращения к провайдеру.
*/
struct PaymentInfo {
1: required domain.Invoice invoice
2: required domain.InvoicePayment payment
3: required domain.ProxyOptions options
4: optional ProxyState state
}
/**
* Результат обращения к прокси в рамках текущей сессии.
*
* В результате обращения прокси может решить, следует ли:
* - завершить сессию взаимодействия с провайдером (FinishIntent);
* - или просто приостановить на определённое время (SleepIntent), обновив своё состояние, которое
* - вернётся к нему в последующем запросе.
*
* Кроме того, прокси может связать с текущим платежом данные транзакции у провайдера для учёта
* в нашей системе, причём на эти данные налагаются следующие требования:
* - данные должны быть связаны на момент завершения первой сессии взаимодействия с провайдером по
* текущему платежу;
* - идентификатор связанной транзакции е может измениться_ при последующих обращениях в прокси
* по текущему платежу.
*/
struct ProcessResult {
1: required Intent intent
2: optional domain.TransactionInfo trx
3: optional ProxyState next_state
}
service ProviderProxy {
/**
* Запрос в рамках сессии проведения платежа, по завершении которой процессинг должен обладать:
* - фактом того, что провайдер _по крайней мере_ авторизовал списание денежных средств в
* пользу системы;
* - данными транзакции провайдера.
*/
ProcessResult ProcessPayment (1: PaymentInfo payment)
throws (1: base.TryLater ex1)
/**
* Запрос в рамках сессии подтверждения платежа, по завершении которой процессинг должен
* быть уверен в том, что провайдер _по крайней мере_ подтвердил финансовые обязательства
* перед системой.
*/
ProcessResult CapturePayment (1: PaymentInfo payment)
throws (1: base.TryLater ex1)
/**
* Запрос в рамках сессии отмены платежа, по завершении которой процессинг должен быть уверен
* в том, что провайдер аннулировал неподтверждённое списание денежных средств.
*
* В случае, если в рамках сессии проведения платежа провайдер авторизовал, но _ещё не
* подтвердил_ списание средств, эта операция является обратной операции `ProcessPayment`.
* В ином случае эта операция неприменима и должна завершится с ошибкой.
*/
ProcessResult CancelPayment (1: PaymentInfo payment)
throws (1: base.TryLater ex1)
// TODO: discuss that shit
// void ValidateOptions (1: domain.ProxyOptions options)
// throws (1: InvalidProxyOptions ex1)
}

View File

@ -0,0 +1,311 @@
/**
* Определения структур и сервисов для поддержания взаимодействия со
* state processor абстракции, реализующей шаг обработки (другими словами,
* один переход состояния) ограниченного конечного автомата со сложным
* состоянием, которое выражается при помощи истории как набора событий,
* порождённых процессором.
*/
include "base.thrift"
namespace java com.rbkmoney.damsel.state_processing
exception EventNotFound {}
exception MachineNotFound {}
exception MachineFailed {}
typedef binary EventBody;
typedef list<EventBody> EventBodies;
/**
* Произвольное событие, продукт перехода в новое состояние.
*/
struct Event {
/**
* Идентификатор события.
* Монотонно возрастающее целочисленное значение, таким образом на множестве
* событий задаётся отношение полного порядка (total order).
*/
1: required base.EventID id;
2: required base.Timestamp created_at; /* Время происхождения события */
3: required EventBody event_payload; /* Описание события */
}
/**
* Сложное состояние, выраженное в виде упорядоченного набора событий
* процессора.
*/
typedef list<Event> History;
/**
* Желаемое действие, продукт перехода в новое состояние.
*
* Возможные действия представляют собой ограниченный язык для управления
* прогрессом автомата, основанием для прихода сигналов или внешних вызовов,
* которые приводят к дальнейшим переходам состояния. Отсутствие заполненных
* полей будет интерпретировано буквально, как отсутствие желаемых действий.
*/
struct ComplexAction {
1: optional SetTimerAction set_timer;
2: optional TagAction tag;
}
/**
* Действие установки таймера ожидания на определённый отрезок времени.
*
* По истечению заданного отрезка времени в процессор поступит сигнал
* `TimeoutSignal`.
*/
struct SetTimerAction {
/** Критерий остановки таймера ожидания */
1: required base.Timer timer;
}
/**
* Действие ассоциации с процессом автомата произвольного значения
*
* После факта успешной ассоциации к автомату можно обратиться с внешним
* вызовом, используя указанное значение, то есть в процессор может поступить
* вызов `processCall` с указанным `tag` в качестве `Reference`.
*
* Это действие в общем случае неидемпотентно, то есть ассоциация связана с
* определённым моментом в истории, и в случае попытки использования одного и
* того же значения ассоциации в разные моменты истории процесса автомата он
* переходит в ошибочное состояние.
*
* То же самое происходит в случае попытки ассоциации одного и того значения с
* двумя или более различными процессами автомата, все они переходят в ошибочное
* состояние, ситуация, требующая ручного вмешательства.
*/
struct TagAction {
/** Значение для ассоциации */
1: required base.Tag tag;
}
/**
* Ссылка, уникально определяющая процесс автомата.
*/
union Reference {
1: base.ID id; /** Основной идентификатор процесса автомата */
2: base.Tag tag; /** Ассоциация */
}
/**
* Внешний вызов.
*
* При помощи вызовов организовано общение автомата с внешним миром и
* получение на них ответов.
*/
typedef binary Call;
/**
* Ответ на внешний вызов.
*/
typedef binary CallResponse;
/**
* Набор данных для обработки внешнего вызова.
*/
struct CallArgs {
1: required Call call; /** Данные вызова */
2: required History history; /** История автомата */
}
/**
* Результат обработки внешнего вызова.
*/
struct CallResult {
/** Список описаний событий, порождённых в результате обработки */
1: required EventBodies events;
/** Действие, которое необходимо выполнить после обработки */
2: required ComplexAction action;
/** Данные ответа */
3: required CallResponse response;
}
/**
* Сигнал, который может поступить в автомат.
*
* Сигналы, как и частный их случай в виде вызовов, приводят к прогрессу
* автомата и эволюции его состояния, то есть нарастанию истории.
*/
union Signal {
1: InitSignal init;
2: TimeoutSignal timeout;
3: RepairSignal repair;
}
/**
* Сигнал, информирующий о запуске автомата.
*/
struct InitSignal {
/** Основной идентификатор процесса автомата */
1: required base.ID id;
/** Набор данных для инициализации */
2: required binary arg;
}
/**
* Сигнал, информирующий об окончании ожидания по таймеру.
*/
struct TimeoutSignal {}
/**
* Сигнал, информирующий о необходимости восстановить работу автомата,
* опционально скорректировать своё состояние.
*/
struct RepairSignal {
1: optional binary arg;
}
/**
* Набор данных для обработки сигнала.
*/
struct SignalArgs {
1: required Signal signal; /** Поступивший сигнал */
2: required History history; /** История автомата */
}
/**
* Результат обработки сигнала.
*/
struct SignalResult {
/** Список описаний событий, порождённых в результате обработки */
1: required EventBodies events;
/** Действие, которое необходимо выполнить после обработки */
2: required ComplexAction action;
}
/**
* Процессор переходов состояния ограниченного конечного автомата.
*
* В результате вызова каждого из методов сервиса должны появиться новое
* состояние и новые действия, приводящие к дальнейшему прогрессу автомата.
*/
service Processor {
/**
* Обработать поступивший сигнал.
*/
SignalResult processSignal (1: SignalArgs a) throws ()
/**
* Обработать внешний вызов и сформировать ответ на него.
*/
CallResult processCall (1: CallArgs a) throws ()
}
/** Универсальная расширяемая структура с набором аргументов. */
struct Args {
/** Неструктурированные данные. */
1: required binary arg;
}
/** Результат запуска процесса автомата. */
struct StartResult {
1: required base.ID id;
}
/**
* Структура задает параметры для выборки событий
*
*/
struct HistoryRange {
/**
* Идентификатор события, после которого следуют события,
* входящие в описываемую выборку. Если поле не указано,
* то в выборку попадут события с самого первого.
*
* Если `after` не указано, в выборку попадут события с начала истории; если
* указано, например, `42`, то в выборку попадут события, случившиеся _после_
* события `42`.
*/
1: optional base.EventID after
/**
* Максимальная длина выборки.
* Допустимо указывать любое значение >= 0.
*
* Если поле не задано, то длина выборки ничем не ограничена.
*
* Если в выборку попало событий еньше_, чем значение `limit`,
* был достигнут конец текущей истории.
*/
2: optional i32 limit
}
/**
* Сервис управления процессами автоматов, отвечающий за реализацию желаемых
* действий и поддержку состояния процессоров.
*
* Для всех методов сервиса справедливы следующие утверждения:
* - если в параметре к методу передан Reference с ссылкой на машину, которой не
* существует, то метод выкинет исключение MachineNotFound
* - если в структуре HistoryRange поле after содержит несуществующий id события,
* то метод выкинет исключение EventNotFound
* - если в процессе выполнения запроса машина перешла в некорректное состояние
* то метод выкинет исключение MachineFailed
*/
service Automaton {
/**
* Запустить новый процесс автомата.
*/
StartResult start (1: Args a) throws ();
/**
* Уничтожить определённый процесс автомата.
*/
void destroy (1: Reference ref)
throws (1: MachineNotFound ex1);
/**
* Попытаться перевести определённый процесс автомата из ошибочного
* состояния в штатное и продолжить его исполнение.
*/
void repair (1: Reference ref, 2: Args a)
throws (1: MachineNotFound ex1, 2: MachineFailed ex2);
/**
* Совершить вызов и дождаться на него ответа.
*/
CallResponse call (1: Reference ref, 2: Call c)
throws (1: MachineNotFound ex1, 2: MachineFailed ex2);
/**
* Метод возвращает список событий (историю) машины ref
*
* Возвращаемый список событий упорядочен по моменту фиксирования его в
* системе: в начале списка располагаются события, произошедшие
* раньше тех, которые располагаются в конце.
*/
History getHistory (1: Reference ref, 2: HistoryRange range)
throws (1: MachineNotFound ex1, 2: EventNotFound ex2);
}
/** Исключение, сигнализирующее о том, что последнего события не существует. */
exception NoLastEvent {}
/**
* Сервис получения истории событий сразу всех машин.
*/
service EventSink {
/**
* Метод возвращает список событий (историю) всех машин системы.
*
* Возвращаемый список событий упорядочен по моменту фиксирования его в
* системе: в начале списка располагаются события, произошедшие
* раньше тех, которые располагаются в конце.
*/
History GetHistory (1: HistoryRange range)
throws (1: EventNotFound ex1, 2: base.InvalidRequest ex2);
/**
* Получить идентификатор наиболее позднего события.
* Если в системе нет ни одного события, то бросится исключение NoLastEvent.
*/
base.EventID GetLastEventID ()
throws (1: NoLastEvent ex1);
}

View File

@ -0,0 +1,15 @@
/*
* Definitions of trivial services serving test purposes only.
*/
include "base.thrift"
namespace java com.rbkmoney.damsel.test
struct Shout {
1: required string contents;
}
service Echo {
Shout echo (1: Shout shout) throws (1: base.Failure ex)
}

View File

@ -0,0 +1,117 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.woody.api.event.ClientEventListener;
import com.rbkmoney.woody.api.event.ServiceEventListener;
import com.rbkmoney.woody.api.generator.IdGenerator;
import com.rbkmoney.woody.thrift.impl.http.THClientBuilder;
import com.rbkmoney.woody.thrift.impl.http.THServiceBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServlet;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransportException;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Before;
import javax.servlet.Servlet;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Created by vpankrashkin on 06.05.16.
*/
public class AbstractTest {
private HandlerCollection handlerCollection;
protected Server server;
protected int serverPort = 8080;
protected TProcessor tProcessor;
@Before
public void startJetty() throws Exception {
server = new Server(serverPort);
HandlerCollection contextHandlerCollection = new HandlerCollection(true); // important! use parameter
// mutableWhenRunning==true
this.handlerCollection = contextHandlerCollection;
server.setHandler(contextHandlerCollection);
server.start();
}
protected void addServlet(Servlet servlet, String mapping) {
try {
ServletContextHandler context = new ServletContextHandler();
ServletHolder defaultServ = new ServletHolder(mapping, servlet);
context.addServlet(defaultServ, mapping);
handlerCollection.addHandler(context);
context.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@After
public void stopJetty() {
try {
server.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
protected String getUrlString() {
return "http://localhost:" + serverPort;
}
public TServlet createTServlet(TProcessor tProcessor) {
return new TServlet(tProcessor, new TBinaryProtocol.Factory());
}
public TServlet createMutableTervlet() {
return new TServlet(new TProcessor() {
@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
return tProcessor.process(in, out);
}
}, new TBinaryProtocol.Factory());
}
protected <T> Servlet createThrftRPCService(Class<T> iface, T handler, ServiceEventListener eventListener) {
THServiceBuilder serviceBuilder = new THServiceBuilder();
serviceBuilder.withEventListener(eventListener);
return serviceBuilder.build(iface, handler);
}
protected String getUrlString(String contextPath) {
return getUrlString() + contextPath;
}
protected <T> T createThriftRPCClient(Class<T> iface, IdGenerator idGenerator, ClientEventListener eventListener) {
return createThriftRPCClient(iface, idGenerator, eventListener, getUrlString());
}
protected <T> T createThriftRPCClient(Class<T> iface, IdGenerator idGenerator, ClientEventListener eventListener, String url) {
try {
THClientBuilder clientBuilder = new THClientBuilder();
clientBuilder.withAddress(new URI(url));
clientBuilder.withHttpClient(HttpClientBuilder.create().build());
clientBuilder.withIdGenerator(idGenerator);
clientBuilder.withEventListener(eventListener);
return clientBuilder.build(iface);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,103 @@
package com.rbkmoney.bmclient.polling;
import com.rbkmoney.bmclient.EventFilter;
import com.rbkmoney.bmclient.EventHandler;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.domain.InvoicePaid;
import com.rbkmoney.damsel.domain.InvoiceStatus;
import com.rbkmoney.damsel.domain.InvoiceUnpaid;
import com.rbkmoney.damsel.event_stock.*;
import com.rbkmoney.damsel.event_stock.EventRange;
import com.rbkmoney.damsel.payment_processing.*;
import com.rbkmoney.thrift.filter.Filter;
import com.rbkmoney.thrift.filter.PathConditionFilter;
import com.rbkmoney.thrift.filter.condition.Relation;
import com.rbkmoney.thrift.filter.rule.PathConditionRule;
import com.rbkmoney.woody.thrift.impl.http.THServiceBuilder;
import org.apache.thrift.TException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* Created by vpankrashkin on 29.06.16.
*/
public class TestClient extends AbstractTest {
private static final Logger log = LoggerFactory.getLogger(TestClient.class);
@Test
public void test() throws URISyntaxException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
addServlet(new THServiceBuilder().build(EventRepositorySrv.Iface.class, new EventRepositorySrv.Iface() {
@Override
public List<StockEvent> getEvents(EventConstraint constraint) throws InvalidRequest, DatasetTooBig, TException {
EventIDRange idRange = constraint.getEventRange().getIdRange();
long fromId = (Long) idRange.getFromId().getFieldValue();
long toId = (Long) idRange.getToId().getFieldValue();
int limit = constraint.getLimit();
if (fromId >= toId) {
return Collections.emptyList();
} else {
List list = new ArrayList();
for (long i = 0; i < limit && i+fromId <= toId; ++i) {
list.add(new StockEvent(SourceEvent.processing_event(createEvent(i+fromId))));
}
return list;
}
}
private Event createEvent(long id) {
Event event = id % 2 == 0 ? new Event(id, "" ,EventSource.invoice(""+id), 0, EventPayload.invoice_event(InvoiceEvent.invoice_status_changed(new InvoiceStatusChanged(InvoiceStatus.paid(new InvoicePaid())))))
:new Event(id, "" ,EventSource.invoice(""+id), 0, EventPayload.invoice_event(InvoiceEvent.invoice_status_changed(new InvoiceStatusChanged(InvoiceStatus.unpaid(new InvoiceUnpaid())))));
return event;
}
@Override
public StockEvent getLastEvent() throws NoLastEvent, TException {
return null;
}
}), "/test");
BMEventPublisherBuilder bmEventPublisherBuilder = new BMEventPublisherBuilder();
bmEventPublisherBuilder.withEventHandler(new EventHandler() {
@Override
public void handleEvent(Object event, String subsKey) {
System.out.println(subsKey+":Handled object: "+event);
}
@Override
public void handleNoMoreElements(String subsKey) {
System.out.println(subsKey+":No more elements");
latch.countDown();
}
});
bmEventPublisherBuilder.withURI(new URI(getUrlString("/test")));
BMPollingEventPublisher eventPublisher = bmEventPublisherBuilder.build();
eventPublisher.subscribe(createEventFilter(0, 10));
latch.await();
}
private EventFilter<StockEvent> createEventFilter(long from, long to) {
EventRange eventRange = new EventRange();
EventIDRange idRange = new EventIDRange();
idRange.setFromId(EventIDBound.inclusive(from));
idRange.setToId(EventIDBound.exclusive(to));
eventRange.setIdRange(idRange);
Filter filter = new PathConditionFilter(new PathConditionRule("payload.invoice_event.invoice_status_changed.status", new com.rbkmoney.thrift.filter.condition.CompareCondition("unpaid", Relation.EQ)));
BMEventFilter bmEventFilter = new BMEventFilter(eventRange, filter);
return bmEventFilter;
}
}

View File

@ -0,0 +1,6 @@
# Configure Jetty for StdErrLog Logging
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StrErrLog
# Overall Logging Level is INFO
org.eclipse.jetty.LEVEL=INFO
# Detail Logging for WebSocket
org.eclipse.jetty.websocket.LEVEL=DEBUG

View File

@ -0,0 +1,5 @@
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p Span[%X{trace_id}-%X{span_id}-%X{parent_id}] - %m%n