Add enabled parameters (#37)

This commit is contained in:
struga 2024-08-23 13:20:38 +03:00 committed by GitHub
parent 9eb7a08fb9
commit e19af782c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 25 additions and 11 deletions

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -33,6 +34,11 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@Component @Component
@ConditionalOnProperty(
value = "fb.stream.invoiceEnabled",
havingValue = "true",
matchIfMissing = true
)
@RequiredArgsConstructor @RequiredArgsConstructor
public class MgEventSinkInvoiceToFraudStreamFactory implements EventSinkFactory { public class MgEventSinkInvoiceToFraudStreamFactory implements EventSinkFactory {
@ -82,18 +88,18 @@ public class MgEventSinkInvoiceToFraudStreamFactory implements EventSinkFactory
); );
branch[0].mapValues(mgEventWrapper -> branch[0].mapValues(mgEventWrapper ->
retryTemplate.execute(args -> retryTemplate.execute(args ->
paymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent()))) paymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent())))
.to(paymentTopic, Produced.with(Serdes.String(), paymentSerde)); .to(paymentTopic, Produced.with(Serdes.String(), paymentSerde));
branch[1].mapValues(mgEventWrapper -> branch[1].mapValues(mgEventWrapper ->
retryTemplate.execute(args -> retryTemplate.execute(args ->
chargebackPaymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent()))) chargebackPaymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent())))
.to(chargebackTopic, Produced.with(Serdes.String(), chargebackSerde)); .to(chargebackTopic, Produced.with(Serdes.String(), chargebackSerde));
branch[2].mapValues(mgEventWrapper -> branch[2].mapValues(mgEventWrapper ->
retryTemplate.execute(args -> retryTemplate.execute(args ->
refundPaymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent()))) refundPaymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent())))
.to(refundTopic, Produced.with(Serdes.String(), refundSerde)); .to(refundTopic, Produced.with(Serdes.String(), refundSerde));
return new KafkaStreams(builder.build(), mgInvoiceEventStreamProperties); return new KafkaStreams(builder.build(), mgInvoiceEventStreamProperties);

View File

@ -18,6 +18,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -26,6 +27,11 @@ import java.util.Properties;
@Slf4j @Slf4j
@Component @Component
@ConditionalOnProperty(
value = "fb.stream.withdrawalEnabled",
havingValue = "true",
matchIfMissing = true
)
@RequiredArgsConstructor @RequiredArgsConstructor
public class MgEventSinkWithdrawalToFraudStreamFactory implements EventSinkFactory { public class MgEventSinkWithdrawalToFraudStreamFactory implements EventSinkFactory {

View File

@ -35,9 +35,9 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
private void initKafkaStream(EventSinkFactory eventSinkFactory) { private void initKafkaStream(EventSinkFactory eventSinkFactory) {
KafkaStreams kafkaStreams = eventSinkFactory.create(); KafkaStreams kafkaStreams = eventSinkFactory.create();
kafkaStreams.start(); kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close(Duration.ofSeconds(cleanTimeoutSec)))); Runtime.getRuntime()
.addShutdownHook(new Thread(() -> kafkaStreams.close(Duration.ofSeconds(cleanTimeoutSec))));
eventSinkStreamsPool.put(eventSinkFactory.getType(), kafkaStreams); eventSinkStreamsPool.put(eventSinkFactory.getType(), kafkaStreams);
log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata()); log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata());
} }
} }

View File

@ -73,6 +73,6 @@ kafka:
chargeback: chargeback chargeback: chargeback
withdrawal: withdrawal withdrawal: withdrawal
stream: fb.stream:
withdrawal: withdrawalEnabled: true
debug: false invoiceEnabled: true

View File

@ -8,8 +8,10 @@ import dev.vality.fraudbusters.mg.connector.exception.UnknownResourceException;
import dev.vality.fraudbusters.mg.connector.utils.BuildUtils; import dev.vality.fraudbusters.mg.connector.utils.BuildUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;