NEW-46: recurrent payment tool event sink (#88)

* Added recurrent payment tool event sink support

* Added final

* Added test

* Fix after кумшуц

* Fix after review

* Moved transactional to handler
This commit is contained in:
Inal Arsanukaev 2019-11-26 18:42:53 +03:00 committed by GitHub
parent 7717894a00
commit bd937df5b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 896 additions and 3 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>newway</artifactId>
<version>1.0.57-SNAPSHOT</version>
<version>1.0.58-SNAPSHOT</version>
<packaging>jar</packaging>
<name>newway</name>
@ -31,7 +31,7 @@
<db.schema>nw</db.schema>
<local.pg.url>jdbc:postgresql://localhost:5432/newway</local.pg.url>
<local.pg.port>5432</local.pg.port>
<damsel.version>1.340-13882a4</damsel.version>
<damsel.version>1.362-cdb9559</damsel.version>
<fistful-proto.version>1.37-b3ce731</fistful-proto.version>
<sonar.jacoco.reportPath>target/sites/jacoco/jacoco.exec</sonar.jacoco.reportPath>
<sonar.cobertura.reportPath>target/site/cobertura/coverage.xml</sonar.cobertura.reportPath>

View File

@ -1,6 +1,7 @@
package com.rbkmoney.newway.config;
import com.rbkmoney.damsel.domain_config.RepositorySrv;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEventSinkSrv;
import com.rbkmoney.newway.domain.Nw;
import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder;
import org.jooq.Schema;
@ -19,7 +20,17 @@ public class ApplicationConfig {
@Value("${dmt.networkTimeout}") int networkTimeout) throws IOException {
return new THSpawnClientBuilder()
.withNetworkTimeout(networkTimeout)
.withAddress(resource.getURI()).build(RepositorySrv.Iface.class);
.withAddress(resource.getURI())
.build(RepositorySrv.Iface.class);
}
@Bean
public RecurrentPaymentToolEventSinkSrv.Iface recurrentPaymentToolClient(@Value("${recurrentPaymentTool.url}") Resource resource,
@Value("${recurrentPaymentTool.networkTimeout}") int networkTimeout) throws IOException {
return new THSpawnClientBuilder()
.withNetworkTimeout(networkTimeout)
.withAddress(resource.getURI())
.build(RecurrentPaymentToolEventSinkSrv.Iface.class);
}
@Bean

View File

@ -0,0 +1,17 @@
package com.rbkmoney.newway.dao.recurrent_payment_tool.iface;
import com.rbkmoney.dao.GenericDao;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import com.rbkmoney.newway.exception.DaoException;
public interface RecurrentPaymentToolDao extends GenericDao {
Long getLastEventId() throws DaoException;
Long save(RecurrentPaymentTool source) throws DaoException;
RecurrentPaymentTool get(String recurrentPaymentToolId) throws DaoException;
void updateNotCurrent(Long rptId) throws DaoException;
}

View File

@ -0,0 +1,66 @@
package com.rbkmoney.newway.dao.recurrent_payment_tool.impl;
import com.rbkmoney.dao.impl.AbstractGenericDao;
import com.rbkmoney.mapper.RecordRowMapper;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import com.rbkmoney.newway.domain.tables.records.RecurrentPaymentToolRecord;
import com.rbkmoney.newway.exception.DaoException;
import org.jooq.Query;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.stereotype.Component;
import static com.rbkmoney.newway.domain.tables.RecurrentPaymentTool.RECURRENT_PAYMENT_TOOL;
import javax.sql.DataSource;
import java.util.Optional;
@Component
public class RecurrentPaymentToolDaoImpl extends AbstractGenericDao implements RecurrentPaymentToolDao {
private final RowMapper<RecurrentPaymentTool> recurrentPaymentToolRowMapper;
public RecurrentPaymentToolDaoImpl(DataSource dataSource) {
super(dataSource);
recurrentPaymentToolRowMapper = new RecordRowMapper<>(RECURRENT_PAYMENT_TOOL, RecurrentPaymentTool.class);
}
@Override
public Long getLastEventId() throws DaoException {
Query query = getDslContext().select(RECURRENT_PAYMENT_TOOL.EVENT_ID.max()).from(RECURRENT_PAYMENT_TOOL);
return fetchOne(query, Long.class);
}
@Override
public Long save(RecurrentPaymentTool source) throws DaoException {
RecurrentPaymentToolRecord record = getDslContext().newRecord(RECURRENT_PAYMENT_TOOL, source);
Query query = getDslContext().insertInto(RECURRENT_PAYMENT_TOOL)
.set(record)
.onConflict(RECURRENT_PAYMENT_TOOL.RECURRENT_PAYMENT_TOOL_ID,
RECURRENT_PAYMENT_TOOL.SEQUENCE_ID,
RECURRENT_PAYMENT_TOOL.CHANGE_ID)
.doNothing()
.returning(RECURRENT_PAYMENT_TOOL.ID);
GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
execute(query, keyHolder);
return Optional.ofNullable(keyHolder.getKey()).map(Number::longValue).orElse(null);
}
@Override
public RecurrentPaymentTool get(String recurrentPaymentToolId) throws DaoException {
Query query = getDslContext().selectFrom(RECURRENT_PAYMENT_TOOL)
.where(RECURRENT_PAYMENT_TOOL.RECURRENT_PAYMENT_TOOL_ID.eq(recurrentPaymentToolId)
.and(RECURRENT_PAYMENT_TOOL.CURRENT));
return fetchOne(query, recurrentPaymentToolRowMapper);
}
@Override
public void updateNotCurrent(Long id) throws DaoException {
Query query = getDslContext().update(RECURRENT_PAYMENT_TOOL).set(RECURRENT_PAYMENT_TOOL.CURRENT, false)
.where(RECURRENT_PAYMENT_TOOL.ID.eq(id));
executeOne(query);
}
}

View File

@ -0,0 +1,56 @@
package com.rbkmoney.newway.poller.event_stock;
import com.rbkmoney.damsel.payment_processing.EventRange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEventSinkSrv;
import com.rbkmoney.newway.service.RecurrentPaymentToolService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
@DependsOn("flywayInitializer")
public class RecurrentPaymentToolPoller {
private final RecurrentPaymentToolEventSinkSrv.Iface recurrentPaymentToolClient;
private final RecurrentPaymentToolService recurrentPaymentToolService;
@Value("${recurrentPaymentTool.polling.limit}")
private int limit;
private long after;
@PostConstruct
public void afterPropertieSet(){
after = recurrentPaymentToolService.getLastEventId().orElse(0L);
}
@Scheduled(fixedDelayString = "${recurrentPaymentTool.polling.delay}")
public void process() {
try {
List<RecurrentPaymentToolEvent> events = recurrentPaymentToolClient.getEvents(getEventRange());
events.forEach(event -> {
try {
recurrentPaymentToolService.handleEvents(event, event);
after = event.getId();
} catch (RuntimeException ex) {
throw new RuntimeException(String.format("Unexpected error when polling recurrent payment tool eventSink, eventId=%d", after), ex);
}
});
} catch (TException e) {
log.warn("Error to polling recurrent payment tool eventSink, after={}", after, e);
}
}
private EventRange getEventRange() {
return new EventRange().setAfter(after).setLimit(limit);
}
}

View File

@ -0,0 +1,39 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import com.rbkmoney.newway.exception.NotFoundException;
import com.rbkmoney.newway.poller.event_stock.Handler;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public abstract class AbstractRecurrentPaymentToolHandler implements Handler<RecurrentPaymentToolChange, RecurrentPaymentToolEvent> {
private final RecurrentPaymentToolDao recurrentPaymentToolDao;
protected RecurrentPaymentTool getRecurrentPaymentToolSource(RecurrentPaymentToolEvent event) {
RecurrentPaymentTool recurrentPaymentTool = recurrentPaymentToolDao.get(event.getSource());
if (recurrentPaymentTool == null) {
throw new NotFoundException(String.format("Recurrent payment tool not found, recurrent_payment_tool_id='%s'", event.getSource()));
}
return recurrentPaymentTool;
}
protected void saveAndUpdateNotCurrent(RecurrentPaymentTool recurrentPaymentTool, Long rptSourceId) {
if (recurrentPaymentToolDao.save(recurrentPaymentTool) != null) {
recurrentPaymentToolDao.updateNotCurrent(rptSourceId);
}
}
protected void setDefaultProperties(RecurrentPaymentTool recurrentPaymentTool, RecurrentPaymentToolEvent event, Integer changeId) {
recurrentPaymentTool.setId(null);
recurrentPaymentTool.setWtime(null);
recurrentPaymentTool.setEventId(event.getId());
recurrentPaymentTool.setChangeId(changeId);
recurrentPaymentTool.setSequenceId(event.getSequence());
recurrentPaymentTool.setEventCreatedAt(TypeUtil.stringToLocalDateTime(event.getCreatedAt()));
}
}

View File

@ -0,0 +1,44 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
public class RecurrentPaymentToolHasAbandonedHandler extends AbstractRecurrentPaymentToolHandler {
private final Filter filter;
public RecurrentPaymentToolHasAbandonedHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_abandoned", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool abandoned handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentTool recurrentPaymentTool = getRecurrentPaymentToolSource(event);
Long rptSourceId = recurrentPaymentTool.getId();
setDefaultProperties(recurrentPaymentTool, event, changeId);
recurrentPaymentTool.setStatus(RecurrentPaymentToolStatus.abandoned);
saveAndUpdateNotCurrent(recurrentPaymentTool, rptSourceId);
log.info("End recurrent payment tool abandoned handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,45 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
public class RecurrentPaymentToolHasAcquiredHandler extends AbstractRecurrentPaymentToolHandler {
private final Filter filter;
public RecurrentPaymentToolHasAcquiredHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_acquired", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool acquired handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentTool recurrentPaymentTool = getRecurrentPaymentToolSource(event);
Long rptSourceId = recurrentPaymentTool.getId();
setDefaultProperties(recurrentPaymentTool, event, changeId);
recurrentPaymentTool.setStatus(RecurrentPaymentToolStatus.acquired);
recurrentPaymentTool.setRecToken(change.getRecPaymentToolAcquired().getToken());
saveAndUpdateNotCurrent(recurrentPaymentTool, rptSourceId);
log.info("End recurrent payment tool acquired handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,153 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.domain.BankCard;
import com.rbkmoney.damsel.domain.DisposablePaymentResource;
import com.rbkmoney.damsel.domain.PaymentTool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolHasCreated;
import com.rbkmoney.geck.common.util.TBaseUtil;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.enums.MobileOperatorType;
import com.rbkmoney.newway.domain.enums.PaymentToolType;
import com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import com.rbkmoney.newway.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.stream.Collectors;
@Slf4j
@Component
public class RecurrentPaymentToolHasCreatedHandler extends AbstractRecurrentPaymentToolHandler {
private final RecurrentPaymentToolDao recurrentPaymentToolDao;
private final Filter filter;
public RecurrentPaymentToolHasCreatedHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.recurrentPaymentToolDao = recurrentPaymentToolDao;
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_created", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool created handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentToolHasCreated recPaymentToolCreated = change.getRecPaymentToolCreated();
var recurrentPaymentToolOrigin = recPaymentToolCreated.getRecPaymentTool();
RecurrentPaymentTool recurrentPaymentTool = new RecurrentPaymentTool();
setDefaultProperties(recurrentPaymentTool, event, changeId);
recurrentPaymentTool.setRecurrentPaymentToolId(event.getSource());
recurrentPaymentTool.setCreatedAt(TypeUtil.stringToLocalDateTime(recurrentPaymentToolOrigin.getCreatedAt()));
recurrentPaymentTool.setPartyId(recurrentPaymentToolOrigin.getPartyId());
recurrentPaymentTool.setShopId(recurrentPaymentToolOrigin.getShopId());
recurrentPaymentTool.setPartyRevision(recurrentPaymentToolOrigin.getPartyRevision());
recurrentPaymentTool.setDomainRevision(recurrentPaymentToolOrigin.getDomainRevision());
recurrentPaymentTool.setStatus(TBaseUtil.unionFieldToEnum(recurrentPaymentToolOrigin.getStatus(), RecurrentPaymentToolStatus.class));
DisposablePaymentResource paymentResource = recurrentPaymentToolOrigin.getPaymentResource();
fillPaymentTool(recurrentPaymentTool, paymentResource.getPaymentTool());
recurrentPaymentTool.setPaymentSessionId(paymentResource.getPaymentSessionId());
if (paymentResource.isSetClientInfo()) {
recurrentPaymentTool.setClientInfoIpAddress(paymentResource.getClientInfo().getIpAddress());
recurrentPaymentTool.setClientInfoFingerprint(paymentResource.getClientInfo().getFingerprint());
}
recurrentPaymentTool.setRecToken(recurrentPaymentToolOrigin.getRecToken());
if (recurrentPaymentToolOrigin.isSetRoute()) {
recurrentPaymentTool.setRouteProviderId(recurrentPaymentToolOrigin.getRoute().getProvider().getId());
recurrentPaymentTool.setRouteTerminalId(recurrentPaymentToolOrigin.getRoute().getTerminal().getId());
}
if (recPaymentToolCreated.isSetRoute()) {
recurrentPaymentTool.setRouteProviderId(recPaymentToolCreated.getRoute().getProvider().getId());
recurrentPaymentTool.setRouteTerminalId(recPaymentToolCreated.getRoute().getTerminal().getId());
}
if (recurrentPaymentToolOrigin.isSetMinimalPaymentCost()) {
recurrentPaymentTool.setAmount(recurrentPaymentToolOrigin.getMinimalPaymentCost().getAmount());
recurrentPaymentTool.setCurrencyCode(recurrentPaymentToolOrigin.getMinimalPaymentCost().getCurrency().getSymbolicCode());
}
if (recPaymentToolCreated.isSetRiskScore()) {
recurrentPaymentTool.setRiskScore(recPaymentToolCreated.getRiskScore().name());
}
recurrentPaymentToolDao.save(recurrentPaymentTool);
log.info("End recurrent payment tool created handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
private void fillPaymentTool(RecurrentPaymentTool recurrentPaymentTool, PaymentTool paymentTool) {
recurrentPaymentTool.setPaymentToolType(TBaseUtil.unionFieldToEnum(paymentTool, PaymentToolType.class));
if (paymentTool.isSetBankCard()) {
fillBankCard(recurrentPaymentTool, paymentTool);
} else if (paymentTool.isSetPaymentTerminal()) {
fillPaymentTerminal(recurrentPaymentTool, paymentTool);
} else if (paymentTool.isSetDigitalWallet()) {
fillDigitalWallet(recurrentPaymentTool, paymentTool);
} else if (paymentTool.isSetCryptoCurrency()) {
fillCryptoCurrency(recurrentPaymentTool, paymentTool);
} else if (paymentTool.isSetMobileCommerce()) {
fillMobileCommerce(recurrentPaymentTool, paymentTool);
}
}
private void fillMobileCommerce(RecurrentPaymentTool recurrentPaymentTool, PaymentTool paymentTool) {
recurrentPaymentTool.setMobileCommerceOperator(TypeUtil.toEnumField(paymentTool.getMobileCommerce().getOperator().name(),
MobileOperatorType.class));
recurrentPaymentTool.setMobileCommercePhoneCc(paymentTool.getMobileCommerce().getPhone().getCc());
recurrentPaymentTool.setMobileCommercePhoneCtn(paymentTool.getMobileCommerce().getPhone().getCtn());
}
private void fillCryptoCurrency(RecurrentPaymentTool recurrentPaymentTool, PaymentTool paymentTool) {
recurrentPaymentTool.setCryptoCurrency(paymentTool.getCryptoCurrency().toString());
}
private void fillDigitalWallet(RecurrentPaymentTool recurrentPaymentTool, PaymentTool paymentTool) {
recurrentPaymentTool.setDigitalWalletId(paymentTool.getDigitalWallet().getId());
recurrentPaymentTool.setDigitalWalletProvider(paymentTool.getDigitalWallet().getProvider().name());
recurrentPaymentTool.setDigitalWalletToken(paymentTool.getDigitalWallet().getToken());
}
private void fillPaymentTerminal(RecurrentPaymentTool recurrentPaymentTool, PaymentTool paymentTool) {
recurrentPaymentTool.setPaymentTerminalType(paymentTool.getPaymentTerminal().getTerminalType().name());
}
private void fillBankCard(RecurrentPaymentTool recurrentPaymentTool, PaymentTool paymentTool) {
BankCard bankCard = paymentTool.getBankCard();
recurrentPaymentTool.setBankCardToken(bankCard.getToken());
recurrentPaymentTool.setBankCardPaymentSystem(bankCard.getPaymentSystem().name());
recurrentPaymentTool.setBankCardBin(bankCard.getBin());
recurrentPaymentTool.setBankCardMaskedPan(bankCard.getMaskedPan());
if (bankCard.isSetTokenProvider()) {
recurrentPaymentTool.setBankCardTokenProvider(bankCard.getTokenProvider().name());
}
if (bankCard.isSetIssuerCountry()) {
recurrentPaymentTool.setBankCardIssuerCountry(bankCard.getIssuerCountry().name());
}
recurrentPaymentTool.setBankCardBankName(bankCard.getBankName());
if (bankCard.isSetMetadata()) {
recurrentPaymentTool.setBankCardMetadataJson(JsonUtil.objectToJsonString(
bankCard.getMetadata().entrySet().stream().collect(Collectors.toMap(
e -> e.getKey(),
e -> JsonUtil.tBaseToJsonNode(e.getValue())
))));
}
if (bankCard.isSetIsCvvEmpty()) {
recurrentPaymentTool.setBankCardIsCvvEmpty(bankCard.isIsCvvEmpty());
}
if (bankCard.isSetExpDate()) {
recurrentPaymentTool.setBankCardExpDateMonth((int) bankCard.getExpDate().getMonth());
recurrentPaymentTool.setBankCardExpDateYear((int) bankCard.getExpDate().getYear());
}
recurrentPaymentTool.setBankCardCardholderName(bankCard.getCardholderName());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,46 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import com.rbkmoney.newway.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
public class RecurrentPaymentToolHasFailedHandler extends AbstractRecurrentPaymentToolHandler {
private final Filter filter;
public RecurrentPaymentToolHasFailedHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_failed", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool failed handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentTool recurrentPaymentTool = getRecurrentPaymentToolSource(event);
Long rptSourceId = recurrentPaymentTool.getId();
setDefaultProperties(recurrentPaymentTool, event, changeId);
recurrentPaymentTool.setStatus(RecurrentPaymentToolStatus.failed);
recurrentPaymentTool.setStatusFailedFailure(JsonUtil.tBaseToJsonString(change.getRecPaymentToolFailed().getFailure()));
saveAndUpdateNotCurrent(recurrentPaymentTool, rptSourceId);
log.info("End recurrent payment tool failed handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,43 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
public class RecurrentPaymentToolRiskScoreChangedHandler extends AbstractRecurrentPaymentToolHandler {
private final Filter filter;
public RecurrentPaymentToolRiskScoreChangedHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_risk_score_changed", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool risk score changed handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentTool recurrentPaymentTool = getRecurrentPaymentToolSource(event);
Long rptSourceId = recurrentPaymentTool.getId();
setDefaultProperties(recurrentPaymentTool, event, changeId);
recurrentPaymentTool.setRiskScore(change.getRecPaymentToolRiskScoreChanged().getRiskScore().name());
saveAndUpdateNotCurrent(recurrentPaymentTool, rptSourceId);
log.info("End recurrent payment tool risk score changed handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,44 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
public class RecurrentPaymentToolRouteChangedHandler extends AbstractRecurrentPaymentToolHandler {
private final Filter filter;
public RecurrentPaymentToolRouteChangedHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_route_changed", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool route changed handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentTool recurrentPaymentTool = getRecurrentPaymentToolSource(event);
Long rptSourceId = recurrentPaymentTool.getId();
setDefaultProperties(recurrentPaymentTool, event, changeId);
recurrentPaymentTool.setRouteProviderId(change.getRecPaymentToolRouteChanged().getRoute().getProvider().getId());
recurrentPaymentTool.setRouteTerminalId(change.getRecPaymentToolRouteChanged().getRoute().getTerminal().getId());
saveAndUpdateNotCurrent(recurrentPaymentTool, rptSourceId);
log.info("End recurrent payment tool route changed handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,51 @@
package com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool;
import com.rbkmoney.damsel.domain.TransactionInfo;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolChange;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus;
import com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool;
import com.rbkmoney.newway.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
public class RecurrentPaymentToolSessionChangedTransactionBoundHandler extends AbstractRecurrentPaymentToolHandler {
private final Filter filter;
public RecurrentPaymentToolSessionChangedTransactionBoundHandler(RecurrentPaymentToolDao recurrentPaymentToolDao) {
super(recurrentPaymentToolDao);
this.filter = new PathConditionFilter(
new PathConditionRule("rec_payment_tool_session_changed.payload.session_transaction_bound", new IsNullCondition().not()));
}
@Override
@Transactional
public void handle(RecurrentPaymentToolChange change, RecurrentPaymentToolEvent event, Integer changeId) {
log.info("Start recurrent payment tool session changed transaction bound handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
RecurrentPaymentTool recurrentPaymentTool = getRecurrentPaymentToolSource(event);
Long rptSourceId = recurrentPaymentTool.getId();
setDefaultProperties(recurrentPaymentTool, event, changeId);
TransactionInfo trx = change.getRecPaymentToolSessionChanged().getPayload().getSessionTransactionBound().getTrx();
recurrentPaymentTool.setSessionPayloadTransactionBoundTrxId(trx.getId());
recurrentPaymentTool.setSessionPayloadTransactionBoundTrxExtraJson(JsonUtil.objectToJsonString(trx.getExtra()));
if (trx.isSetAdditionalInfo()) {
recurrentPaymentTool.setSessionPayloadTransactionBoundTrxAdditionalInfoRrn(trx.getAdditionalInfo().getRrn());
}
saveAndUpdateNotCurrent(recurrentPaymentTool, rptSourceId);
log.info("End recurrent payment tool session changed transaction bound handling, eventId={}, recurrent_payment_tool_id={}", event.getId(), event.getSource());
}
@Override
public Filter<RecurrentPaymentToolChange> getFilter() {
return filter;
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.newway.service;
import com.rbkmoney.damsel.payment_processing.RecurrentPaymentToolEvent;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.poller.event_stock.impl.recurrent_payment_tool.AbstractRecurrentPaymentToolHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
@RequiredArgsConstructor
public class RecurrentPaymentToolService implements EventService<RecurrentPaymentToolEvent, RecurrentPaymentToolEvent>{
private final RecurrentPaymentToolDao recurrentPaymentToolDao;
private final List<AbstractRecurrentPaymentToolHandler> recurrentPaymentToolHandlers;
public Optional<Long> getLastEventId() {
Optional<Long> lastEventId = Optional.ofNullable(recurrentPaymentToolDao.getLastEventId());
log.info("Last recurrent payment tool eventId={}", lastEventId);
return lastEventId;
}
@Override
public void handleEvents(RecurrentPaymentToolEvent event, RecurrentPaymentToolEvent payload) {
AtomicInteger cnt = new AtomicInteger(0);
event.getPayload().forEach(
change -> recurrentPaymentToolHandlers.forEach
(handler -> {
if (handler.accept(change)) {
handler.handle(change, event, cnt.getAndIncrement());
}
}));
}
}

View File

@ -119,6 +119,12 @@ dmt:
delay: 3000
maxQuerySize: 10
enable: true
recurrentPaymentTool:
url: http://hellgate:8022/v1/eventsink/recurrentPaymentTool
networkTimeout: 5000
polling:
delay: 10000
limit: 500
cache:
invoice:

View File

@ -0,0 +1,57 @@
CREATE TYPE nw.recurrent_payment_tool_status AS ENUM('created', 'acquired', 'abandoned', 'failed');
CREATE TABLE nw.recurrent_payment_tool(
id BIGSERIAL NOT NULL,
event_id BIGINT NOT NULL,
sequence_id INT NOT NULL,
change_id INT NOT NULL,
event_created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
recurrent_payment_tool_id CHARACTER VARYING NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
party_id CHARACTER VARYING NOT NULL,
shop_id CHARACTER VARYING NOT NULL,
party_revision BIGINT,
domain_revision BIGINT NOT NULL,
status nw.recurrent_payment_tool_status NOT NULL,
status_failed_failure CHARACTER VARYING,
payment_tool_type nw.payment_tool_type NOT NULL,
bank_card_token CHARACTER VARYING,
bank_card_payment_system CHARACTER VARYING,
bank_card_bin CHARACTER VARYING,
bank_card_masked_pan CHARACTER VARYING,
bank_card_token_provider CHARACTER VARYING,
bank_card_issuer_country CHARACTER VARYING,
bank_card_bank_name CHARACTER VARYING,
bank_card_metadata_json CHARACTER VARYING,
bank_card_is_cvv_empty BOOLEAN,
bank_card_exp_date_month INT,
bank_card_exp_date_year INT,
bank_card_cardholder_name CHARACTER VARYING,
payment_terminal_type CHARACTER VARYING,
digital_wallet_provider CHARACTER VARYING,
digital_wallet_id CHARACTER VARYING,
digital_wallet_token CHARACTER VARYING,
crypto_currency CHARACTER VARYING,
mobile_commerce_operator nw.mobile_operator_type,
mobile_commerce_phone_cc CHARACTER VARYING,
mobile_commerce_phone_ctn CHARACTER VARYING,
payment_session_id CHARACTER VARYING,
client_info_ip_address CHARACTER VARYING,
client_info_fingerprint CHARACTER VARYING,
rec_token CHARACTER VARYING,
route_provider_id INT,
route_terminal_id INT,
amount BIGINT NOT NULL,
currency_code CHARACTER VARYING NOT NULL,
risk_score CHARACTER VARYING,
session_payload_transaction_bound_trx_id CHARACTER VARYING,
session_payload_transaction_bound_trx_extra_json CHARACTER VARYING,
session_payload_transaction_bound_trx_additional_info_rrn CHARACTER VARYING,
wtime TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc'),
current BOOLEAN NOT NULL DEFAULT TRUE,
CONSTRAINT recurrent_payment_tool_pkey PRIMARY KEY (id)
);
CREATE INDEX recurrent_payment_tool_event_id_idx on nw.recurrent_payment_tool(event_id);
CREATE INDEX recurrent_payment_tool_id_idx on nw.recurrent_payment_tool(recurrent_payment_tool_id);
ALTER TABLE nw.recurrent_payment_tool ADD CONSTRAINT recurrent_payment_tool_uniq UNIQUE (recurrent_payment_tool_id, sequence_id, change_id);

View File

@ -12,6 +12,7 @@ import com.rbkmoney.newway.dao.party.iface.*;
import com.rbkmoney.newway.dao.payout.iface.PayoutDao;
import com.rbkmoney.newway.dao.payout.iface.PayoutSummaryDao;
import com.rbkmoney.newway.dao.rate.iface.RateDao;
import com.rbkmoney.newway.dao.recurrent_payment_tool.iface.RecurrentPaymentToolDao;
import com.rbkmoney.newway.dao.source.iface.SourceDao;
import com.rbkmoney.newway.dao.wallet.iface.WalletDao;
import com.rbkmoney.newway.dao.withdrawal.iface.WithdrawalDao;
@ -124,6 +125,8 @@ public class DaoTests extends AbstractAppDaoTests {
private CashFlowService cashFlowService;
@Autowired
private PaymentIdsGeneratorDaoImpl idsGeneratorDao;
@Autowired
private RecurrentPaymentToolDao recurrentPaymentToolDao;
@Test
public void depositDaoTest() {
@ -669,4 +672,17 @@ public class DaoTests extends AbstractAppDaoTests {
assertEquals(100, list.size());
assertEquals(99,list.get(99) - list.get(0));
}
@Test
public void recurrentPaymentToolDaoTest(){
jdbcTemplate.execute("truncate table nw.recurrent_payment_tool cascade");
RecurrentPaymentTool recurrentPaymentTool = random(RecurrentPaymentTool.class);
recurrentPaymentTool.setCurrent(true);
Long id = recurrentPaymentToolDao.save(recurrentPaymentTool);
recurrentPaymentTool.setId(id);
assertEquals(recurrentPaymentTool, recurrentPaymentToolDao.get(recurrentPaymentTool.getRecurrentPaymentToolId()));
assertEquals(recurrentPaymentTool.getEventId(), recurrentPaymentToolDao.getLastEventId());
recurrentPaymentToolDao.updateNotCurrent(recurrentPaymentTool.getId());
assertNull(recurrentPaymentToolDao.get(recurrentPaymentTool.getRecurrentPaymentToolId()));
}
}

View File

@ -0,0 +1,157 @@
package com.rbkmoney.newway.service;
import com.rbkmoney.damsel.domain.*;
import com.rbkmoney.damsel.msgpack.Value;
import com.rbkmoney.damsel.payment_processing.*;
import com.rbkmoney.newway.dao.AbstractAppDaoTests;
import com.rbkmoney.newway.domain.enums.PaymentToolType;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.*;
public class RecurrentPaymentToolServiceTest extends AbstractAppDaoTests {
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
@Autowired
private RecurrentPaymentToolService recurrentPaymentToolService;
@Test
public void handleEventsTest() {
String recurrentId = "recurrentId";
RecurrentPaymentToolEvent event = buildEvent(recurrentId);
recurrentPaymentToolService.handleEvents(event, event);
String sql = "select * from nw.recurrent_payment_tool where recurrent_payment_tool_id = :id";
List<com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool> recurrentPaymentTools =
jdbcTemplate.query(sql, new MapSqlParameterSource("id", recurrentId),
new BeanPropertyRowMapper<>(com.rbkmoney.newway.domain.tables.pojos.RecurrentPaymentTool.class));
assertEquals(7, recurrentPaymentTools.size());
var created = recurrentPaymentTools.get(0);
assertEquals(recurrentId, created.getRecurrentPaymentToolId());
assertEquals(123, created.getEventId().longValue());
assertEquals("shop_id", created.getShopId());
assertEquals(com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus.created, created.getStatus());
assertEquals(PaymentToolType.bank_card, created.getPaymentToolType());
assertEquals(123, created.getAmount().longValue());
assertEquals("high", created.getRiskScore());
assertEquals(54, created.getRouteProviderId().intValue());
assertFalse(created.getCurrent());
var riskScoreChanged = recurrentPaymentTools.get(1);
assertEquals("fatal", riskScoreChanged.getRiskScore());
assertNotEquals(created.getWtime(), riskScoreChanged.getWtime());
assertFalse(riskScoreChanged.getCurrent());
var routeChanged = recurrentPaymentTools.get(2);
assertEquals(123, routeChanged.getRouteProviderId().longValue());
assertEquals(456, routeChanged.getRouteTerminalId().longValue());
assertFalse(routeChanged.getCurrent());
var abandoned = recurrentPaymentTools.get(3);
assertEquals(com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus.abandoned, abandoned.getStatus());
assertFalse(abandoned.getCurrent());
var acquired = recurrentPaymentTools.get(4);
assertEquals("kek_token", acquired.getRecToken());
assertNotEquals(created.getRecToken(), acquired.getRecToken());
assertFalse(acquired.getCurrent());
var failed = recurrentPaymentTools.get(5);
assertEquals(com.rbkmoney.newway.domain.enums.RecurrentPaymentToolStatus.failed, failed.getStatus());
assertFalse(failed.getCurrent());
var sessionChanged = recurrentPaymentTools.get(6);
assertEquals("trxId", sessionChanged.getSessionPayloadTransactionBoundTrxId());
assertEquals("rrn", sessionChanged.getSessionPayloadTransactionBoundTrxAdditionalInfoRrn());
assertTrue(sessionChanged.getCurrent());
}
private RecurrentPaymentToolEvent buildEvent(String recurrentId) {
return new RecurrentPaymentToolEvent()
.setId(123L)
.setCreatedAt("2016-03-22T06:12:27Z")
.setSource(recurrentId)
.setSequence(12)
.setPayload(List.of(
RecurrentPaymentToolChange.rec_payment_tool_created(
new RecurrentPaymentToolHasCreated()
.setRecPaymentTool(new RecurrentPaymentTool()
.setId(recurrentId)
.setShopId("shop_id")
.setPartyId("party_id")
.setPartyRevision(124)
.setDomainRevision(1245)
.setStatus(RecurrentPaymentToolStatus.created(new RecurrentPaymentToolCreated()))
.setCreatedAt("2016-03-22T06:12:27Z")
.setPaymentResource(new DisposablePaymentResource()
.setPaymentTool(PaymentTool.bank_card(new BankCard()
.setToken("kkekekek_token")
.setPaymentSystem(BankCardPaymentSystem.amex)
.setBin("bin")
.setMaskedPan("masked")
.setTokenProvider(BankCardTokenProvider.applepay)
.setIssuerCountry(Residence.ABH)
.setBankName("bank_name")
.setMetadata(Map.of("kek", Value.b(true)))))
.setPaymentSessionId("kek_session_id")
.setClientInfo(new ClientInfo()
.setIpAddress("127.0.0.1")
.setFingerprint("kekksiki")))
.setRecToken("kek_token_111")
.setRoute(new PaymentRoute()
.setProvider(new ProviderRef(888))
.setTerminal(new TerminalRef(9999)))
.setMinimalPaymentCost(new Cash(123, new CurrencyRef("RUB")))
)
.setRiskScore(RiskScore.high)
.setRoute(new PaymentRoute()
.setProvider(new ProviderRef(54))
.setTerminal(new TerminalRef(9883)))
),
RecurrentPaymentToolChange.rec_payment_tool_risk_score_changed(
new RecurrentPaymentToolRiskScoreChanged()
.setRiskScore(RiskScore.fatal)
),
RecurrentPaymentToolChange.rec_payment_tool_route_changed(
new RecurrentPaymentToolRouteChanged()
.setRoute(new PaymentRoute()
.setProvider(new ProviderRef(123))
.setTerminal(new TerminalRef(456)))
),
RecurrentPaymentToolChange.rec_payment_tool_abandoned(
new RecurrentPaymentToolHasAbandoned()
),
RecurrentPaymentToolChange.rec_payment_tool_acquired(
new RecurrentPaymentToolHasAcquired()
.setToken("kek_token")
),
RecurrentPaymentToolChange.rec_payment_tool_failed(
new RecurrentPaymentToolHasFailed()
.setFailure(OperationFailure.failure(new Failure().setCode("code")))
),
RecurrentPaymentToolChange.rec_payment_tool_session_changed(
new RecurrentPaymentToolSessionChange()
.setPayload(SessionChangePayload.session_transaction_bound(
new SessionTransactionBound()
.setTrx(new TransactionInfo()
.setId("trxId")
.setExtra(Map.of("lol", "kek"))
.setAdditionalInfo(new AdditionalTransactionInfo()
.setRrn("rrn")))
))
)
));
}
}