This commit is contained in:
k.struzhkin 2021-02-16 10:44:38 +03:00
parent e58b21f36f
commit afdbaa2ccc
8 changed files with 135 additions and 21 deletions

View File

@ -155,6 +155,12 @@
<version>1.15.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -68,7 +68,7 @@ public class WithdrawalMapper implements Mapper<TimestampedChange, MachineEvent,
withdrawal.setProviderInfo(WithdrawalModelUtil.initProviderInfo(withdrawalInfo, destinationInfo));
withdrawal.setError(WithdrawalModelUtil.initError(change.getChange().getStatusChanged()));
log.debug("InvoicePaymentMapper withdrawal: {}", withdrawal);
log.debug("Withdrawal map result: {}", withdrawal);
return withdrawal;
}

View File

@ -10,26 +10,25 @@ import com.rbkmoney.fistful.withdrawal.status.Failed;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseProcessor;
import com.rbkmoney.geck.serializer.kit.tbase.TDomainToStringErrorHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.NonNull;
import java.io.IOException;
@Slf4j
public class WithdrawalModelUtil {
@NonNull
public static ProviderInfo initProviderInfo(WithdrawalState withdrawalState, DestinationState destinationState) {
ProviderInfo providerInfo = new ProviderInfo();
if (withdrawalState.isSetRoute()) {
providerInfo.setTerminalId(String.valueOf(withdrawalState.getRoute().getTerminalId()));
providerInfo.setProviderId(String.valueOf(withdrawalState.getRoute().getProviderId()));
if (destinationState.getResource().isSetBankCard()
&& destinationState.getResource().getBankCard().isSetBankCard()
&& destinationState.getResource().getBankCard().getBankCard().isSetIssuerCountry()) {
providerInfo.setCountry(destinationState.getResource().getBankCard().getBankCard().getIssuerCountry().name());
}
return providerInfo;
}
if (destinationState.getResource().isSetBankCard()
&& destinationState.getResource().getBankCard().isSetBankCard()
&& destinationState.getResource().getBankCard().getBankCard().isSetIssuerCountry()) {
providerInfo.setCountry(destinationState.getResource().getBankCard().getBankCard().getIssuerCountry().name());
}
return providerInfo;
return null;
}
public static Error initError(StatusChange statusChange) {

View File

@ -48,7 +48,7 @@ kafka:
topic:
source:
invoicing: mg-event
withdrawal: mg-fistfull
withdrawal: mg-withdrawal
sink:
payment: payment
refund: refund

View File

@ -4,13 +4,18 @@ import com.rbkmoney.damsel.domain.*;
import com.rbkmoney.damsel.fraudbusters.Payment;
import com.rbkmoney.damsel.payment_processing.Invoice;
import com.rbkmoney.damsel.payment_processing.InvoicingSrv;
import com.rbkmoney.fistful.withdrawal.ManagementSrv;
import com.rbkmoney.fistful.withdrawal.WithdrawalState;
import com.rbkmoney.fraudbusters.mg.connector.factory.EventRangeFactory;
import com.rbkmoney.fraudbusters.mg.connector.mapper.impl.WithdrawalBeanUtils;
import com.rbkmoney.fraudbusters.mg.connector.serde.deserializer.ChargebackDeserializer;
import com.rbkmoney.fraudbusters.mg.connector.serde.deserializer.PaymentDeserializer;
import com.rbkmoney.fraudbusters.mg.connector.serde.deserializer.RefundDeserializer;
import com.rbkmoney.fraudbusters.mg.connector.serde.deserializer.WithdrawalDeserializer;
import com.rbkmoney.fraudbusters.mg.connector.service.HgClientService;
import com.rbkmoney.fraudbusters.mg.connector.utils.BuildUtils;
import com.rbkmoney.fraudbusters.mg.connector.utils.MgEventSinkFlowGenerator;
import com.rbkmoney.fraudbusters.mg.connector.utils.WithdrawalFlowGenerator;
import com.rbkmoney.machinegun.eventsink.SinkEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
@ -18,7 +23,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.thrift.TException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@ -32,6 +36,8 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
@Slf4j
@RunWith(SpringRunner.class)
@ -43,6 +49,12 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
@MockBean
InvoicingSrv.Iface invoicingClient;
@MockBean
ManagementSrv.Iface fistfulClient;
@MockBean
com.rbkmoney.fistful.destination.ManagementSrv.Iface destinationClient;
@MockBean
com.rbkmoney.fistful.wallet.ManagementSrv.Iface walletClient;
@Autowired
private EventRangeFactory eventRangeFactory;
@ -51,7 +63,7 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
public void contextLoads() throws TException, IOException, InterruptedException {
List<SinkEvent> sinkEvents = MgEventSinkFlowGenerator.generateSuccessFlow(SOURCE_ID);
mockPayment(SOURCE_ID);
sinkEvents.forEach(this::produceMessageToEventSink);
sinkEvents.forEach(sinkEvent -> produceMessageToEventSink(MG_EVENT, sinkEvent));
checkMessageInTopic(PAYMENT, PaymentDeserializer.class, 2);
String sourceID_refund_1 = "sourceID_refund_1";
@ -59,7 +71,7 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
mockRefund(sourceID_refund_1, 7, "1");
mockRefund(sourceID_refund_1, 9, "2");
sinkEvents = MgEventSinkFlowGenerator.generateRefundedFlow(sourceID_refund_1);
sinkEvents.forEach(this::produceMessageToEventSink);
sinkEvents.forEach(sinkEvent -> produceMessageToEventSink(MG_EVENT, sinkEvent));
checkMessageInTopic(REFUND, RefundDeserializer.class, 2);
@ -67,17 +79,30 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
sinkEvents = MgEventSinkFlowGenerator.generateChargebackFlow(sourceChargeback);
mockPayment(sourceChargeback);
mockChargeback(sourceChargeback, 6, "1");
sinkEvents.forEach(this::produceMessageToEventSink);
sinkEvents.forEach(sinkEvent -> produceMessageToEventSink(MG_EVENT, sinkEvent));
checkMessageInTopic(CHARGEBACK, ChargebackDeserializer.class, 1);
//check exceptions retry
sinkEvents = MgEventSinkFlowGenerator.generateSuccessFlow(SOURCE_ID);
mockPaymentWithException(SOURCE_ID);
sinkEvents.forEach(this::produceMessageToEventSink);
sinkEvents.forEach(sinkEvent -> produceMessageToEventSink(MG_EVENT, sinkEvent));
checkMessageInTopic(PAYMENT, PaymentDeserializer.class, 6);
}
@Test
public void withdrawalStreamTest() throws TException, InterruptedException {
when(fistfulClient.get(any(), any())).thenReturn(new WithdrawalState()
.setBody(WithdrawalBeanUtils.createCash()));
when(destinationClient.get(any(), any())).thenReturn(WithdrawalBeanUtils.createDestinationState());
when(walletClient.get(any(), any())).thenReturn(WithdrawalBeanUtils.createWallet());
List<SinkEvent> sinkEvents = WithdrawalFlowGenerator.generateSuccessFlow(SOURCE_ID);
sinkEvents.forEach(sinkEvent -> produceMessageToEventSink(MG_WITHDRAWAL, sinkEvent));
checkMessageInTopic(WITHDRAWAL, WithdrawalDeserializer.class, 4);
}
private void checkMessageInTopic(String topicName, Class<?> clazz, int size) throws InterruptedException {
Thread.sleep(TIMEOUT);
@ -100,7 +125,7 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
}
private void mockPaymentWithException(String sourceId) throws TException, IOException {
Mockito.when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(4)))
when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(4)))
.thenThrow(new RuntimeException())
.thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,
sourceId, "1", "1", "1",
@ -109,21 +134,21 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
}
private OngoingStubbing<Invoice> mockPayment(String sourceId, int i) throws TException, IOException {
return Mockito.when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(i)))
return when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(i)))
.thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,
sourceId, "1", "1", "1",
InvoiceStatus.paid(new InvoicePaid()), InvoicePaymentStatus.processed(new InvoicePaymentProcessed())));
}
private void mockRefund(String sourceId, int sequenceId, String refundId) throws TException, IOException {
Mockito.when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId)))
when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId)))
.thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,
sourceId, "1", refundId, "1",
InvoiceStatus.paid(new InvoicePaid()), InvoicePaymentStatus.refunded(new InvoicePaymentRefunded())));
}
private void mockChargeback(String sourceId, int sequenceId, String chargebackId) throws TException, IOException {
Mockito.when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId)))
when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId)))
.thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,
sourceId, "1", "1", chargebackId,
InvoiceStatus.paid(new InvoicePaid()), InvoicePaymentStatus.charged_back(new InvoicePaymentChargedBack())));

View File

@ -39,9 +39,11 @@ public abstract class KafkaAbstractTest {
public static final String KAFKA_DOCKER_VERSION = "5.0.1";
public static final String MG_EVENT = "mg-event";
public static final String MG_WITHDRAWAL = "mg-withdrawal";
public static final String PAYMENT = "payment";
public static final String REFUND = "refund";
public static final String CHARGEBACK = "chargeback";
public static final String WITHDRAWAL = "withdrawal";
@ClassRule
public static KafkaContainer kafka = new KafkaContainer(KAFKA_DOCKER_VERSION).withEmbeddedZookeeper();
@ -57,6 +59,8 @@ public abstract class KafkaAbstractTest {
initTopic(PAYMENT);
initTopic(REFUND);
initTopic(CHARGEBACK);
initTopic(MG_WITHDRAWAL);
initTopic(WITHDRAWAL);
}
private <T> Consumer<String, T> initTopic(String topicName) {
@ -101,9 +105,9 @@ public abstract class KafkaAbstractTest {
return new KafkaProducer<>(props);
}
void produceMessageToEventSink(SinkEvent sinkEvent) {
void produceMessageToEventSink(String topic, SinkEvent sinkEvent) {
try (Producer<String, SinkEvent> producer = createProducer()) {
ProducerRecord<String, SinkEvent> producerRecord = new ProducerRecord<>(MG_EVENT, sinkEvent.getEvent().getSourceId(), sinkEvent);
ProducerRecord<String, SinkEvent> producerRecord = new ProducerRecord<>(topic, sinkEvent.getEvent().getSourceId(), sinkEvent);
producer.send(producerRecord).get();
log.info("produceMessageToEventSink() sinkEvent: {}", sinkEvent);
} catch (Exception e) {

View File

@ -25,6 +25,7 @@ public class WithdrawalBeanUtils {
.setIssuerCountry(Residence.PAN)
.setPaymentSystem(BankCardPaymentSystem.mastercard)
.setToken("cardToken")
.setMaskedPan("1232132")
.setCardType(CardType.debit)
.setCardholderName("CARD HOLDER")));
return new DestinationState().setResource(resource);

View File

@ -0,0 +1,79 @@
package com.rbkmoney.fraudbusters.mg.connector.utils;
import com.rbkmoney.fistful.base.Failure;
import com.rbkmoney.fistful.withdrawal.Change;
import com.rbkmoney.fistful.withdrawal.StatusChange;
import com.rbkmoney.fistful.withdrawal.TimestampedChange;
import com.rbkmoney.fistful.withdrawal.status.Failed;
import com.rbkmoney.fistful.withdrawal.status.Pending;
import com.rbkmoney.fistful.withdrawal.status.Status;
import com.rbkmoney.fistful.withdrawal.status.Succeeded;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.machinegun.eventsink.SinkEvent;
import com.rbkmoney.machinegun.msgpack.Value;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class WithdrawalFlowGenerator {
public static final String SOURCE_NS = "SOURCE_NS";
public static List<SinkEvent> generateSuccessFlow(String sourceId) {
final ArrayList<SinkEvent> sinkEvents = new ArrayList<>();
Long sequenceId = 0L;
sinkEvents.add(createSinkEvent(createMachineEvent(createTimestampedChange(createPendingStatus()), sourceId, sequenceId++)));
sinkEvents.add(createSinkEvent(createMachineEvent(createTimestampedChange(createPendingStatus()), sourceId, sequenceId++)));
sinkEvents.add(createSinkEvent(createMachineEvent(createTimestampedChange(createSuccessStatus()), sourceId, sequenceId++)));
sinkEvents.add(createSinkEvent(createMachineEvent(createTimestampedChange(createFailedStatus()), sourceId, sequenceId)));
return sinkEvents;
}
private static Status createFailedStatus() {
return new Status(Status.failed(new Failed()
.setFailure(new Failure().setCode("code")
.setReason("reason"))));
}
private static Status createSuccessStatus() {
return new Status(Status.succeeded(new Succeeded()));
}
private static Status createPendingStatus() {
return new Status(Status.pending(new Pending()));
}
private static TimestampedChange createTimestampedChange(Status status) {
final TimestampedChange timestampedChange = new TimestampedChange();
final Change change = new Change();
timestampedChange.setOccuredAt(Instant.now().toString());
change.setStatusChanged(new StatusChange()
.setStatus(status));
timestampedChange.setChange(change);
return timestampedChange;
}
private static SinkEvent createSinkEvent(MachineEvent machineEvent) {
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setEvent(machineEvent);
return sinkEvent;
}
private static MachineEvent createMachineEvent(TimestampedChange timestampedChange, String sourceId, Long sequenceId) {
MachineEvent message = new MachineEvent();
message.setCreatedAt(TypeUtil.temporalToString(Instant.now()));
message.setEventId(sequenceId);
message.setSourceNs(SOURCE_NS);
message.setSourceId(sourceId);
ThriftSerializer<TimestampedChange> eventPayloadThriftSerializer = new ThriftSerializer<>();
Value data = new Value();
data.setBin(eventPayloadThriftSerializer.serialize("", timestampedChange));
message.setData(data);
return message;
}
}