diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java index 1d4b6f8..0c4b3b9 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java @@ -6,9 +6,11 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.scheduling.annotation.EnableScheduling; import javax.annotation.PreDestroy; +@EnableScheduling @ServletComponentScan @SpringBootApplication public class FraudbustersMgConnectorApplication extends SpringApplication { diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/KafkaConfig.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/KafkaConfig.java index 1be0150..1c5b957 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/KafkaConfig.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/KafkaConfig.java @@ -2,6 +2,7 @@ package com.rbkmoney.fraudbusters.mg.connector.config; import com.rbkmoney.fraudbusters.mg.connector.serde.MachineEventSerde; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; @@ -22,6 +23,7 @@ public class KafkaConfig { private static final String APP_ID = "fraud-connector"; private static final String CLIENT_ID = "fraud-connector-client"; private static final String PKCS_12 = "PKCS12"; + @Value("${kafka.bootstrap.servers}") private String bootstrapServers; @Value("${kafka.ssl.server-password}") @@ -42,6 +44,8 @@ public class KafkaConfig { private int retriesAttempts; @Value("${kafka.stream.retries-backoff-ms}") private int retriesBackoffMs; + @Value("${kafka.stream.default-api-timeout-ms}") + private int defaultApiTimeoutMs; @Bean public Properties mgInvoiceEventStreamProperties() { @@ -76,6 +80,7 @@ public class KafkaConfig { props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, retriesBackoffMs); props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs); } private Map sslConfigure() { diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/RetryConfig.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/RetryConfig.java index 5574289..1f0ff43 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/RetryConfig.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/config/RetryConfig.java @@ -10,22 +10,22 @@ import org.springframework.retry.support.RetryTemplate; @Configuration public class RetryConfig { - @Value("${retry.timeout:1000}") - private long timeout; + @Value("${kafka.stream.retries-attempts}") + private int retriesAttempts; - @Value("${retry.max.attempts:3}") - private int maxAttempts; + @Value("${kafka.stream.retries-backoff-ms}") + private int retriesBackoffMs; @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); - fixedBackOffPolicy.setBackOffPeriod(timeout); + fixedBackOffPolicy.setBackOffPeriod(retriesBackoffMs); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); - retryPolicy.setMaxAttempts(maxAttempts); + retryPolicy.setMaxAttempts(retriesAttempts); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/factory/MgEventSinkInvoiceToFraudStreamFactory.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/factory/MgEventSinkInvoiceToFraudStreamFactory.java index a2e0571..0d7b474 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/factory/MgEventSinkInvoiceToFraudStreamFactory.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/factory/MgEventSinkInvoiceToFraudStreamFactory.java @@ -77,7 +77,10 @@ public class MgEventSinkInvoiceToFraudStreamFactory implements EventSinkFactory branch[0].mapValues(mgEventWrapper -> retryTemplate.execute(args -> - paymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent()))) + { + log.info("Rerty"); + return paymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent()); + })) .to(paymentTopic, Produced.with(Serdes.String(), paymentSerde)); branch[1].mapValues(mgEventWrapper -> diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java index 8fb79a1..530bc28 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java @@ -2,7 +2,6 @@ package com.rbkmoney.fraudbusters.mg.connector.listener; import com.rbkmoney.fraudbusters.mg.connector.factory.EventSinkFactory; import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool; -import com.rbkmoney.fraudbusters.mg.connector.utils.ShutdownManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; @@ -18,10 +17,8 @@ import java.util.List; @RequiredArgsConstructor public class StartupListener implements ApplicationListener { - public static final int FATAL_ERROR_CODE_IN_STREAM = 228; private final List eventSinkFactories; private final EventSinkStreamsPool eventSinkStreamsPool; - private final ShutdownManager shutdownManager; @Override public void onApplicationEvent(ContextRefreshedEvent event) { @@ -40,7 +37,7 @@ public class StartupListener implements ApplicationListener kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN) + .forEach(KafkaStreams::start); + } + } + public void clean() { - kafkaStreamsList.forEach(kafkaStreams -> { - kafkaStreams.close(Duration.ofSeconds(5L)); - }); + kafkaStreamsList.forEach(kafkaStreams -> kafkaStreams.close(Duration.ofSeconds(5L))); } } diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/service/HgClientService.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/service/HgClientService.java index 1539b24..c2a0d84 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/service/HgClientService.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/service/HgClientService.java @@ -4,10 +4,12 @@ package com.rbkmoney.fraudbusters.mg.connector.service; import com.rbkmoney.damsel.payment_processing.*; import com.rbkmoney.fraudbusters.mg.connector.domain.InvoicePaymentWrapper; import com.rbkmoney.fraudbusters.mg.connector.exception.PaymentInfoNotFoundException; -import com.rbkmoney.fraudbusters.mg.connector.exception.PaymentInfoRequestException; import com.rbkmoney.fraudbusters.mg.connector.factory.EventRangeFactory; +import com.rbkmoney.woody.api.flow.error.WUnavailableResultException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.springframework.stereotype.Service; @@ -59,10 +61,10 @@ public class HgClientService { throw new PaymentInfoNotFoundException("Not found payment in invoice!"); }); return invoicePaymentWrapper; - } catch (TException e) { + } catch (WUnavailableResultException | TException e) { log.error("Error when HgClientService getInvoiceInfo invoiceId: {} eventId: {} sequenceId: {} e: ", invoiceId, eventId, sequenceId, e); - throw new PaymentInfoRequestException(e); + throw new RetriableException(e); } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 18baadd..cda6324 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -55,6 +55,7 @@ kafka: stream: retries-attempts: 300 retries-backoff-ms: 1000 + default-api-timeout-ms: 300000 ssl: enable: false keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12 diff --git a/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java b/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java index 94433de..3860d4a 100644 --- a/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java +++ b/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java @@ -20,6 +20,7 @@ import com.rbkmoney.machinegun.eventsink.SinkEvent; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.junit.Test; import org.junit.runner.RunWith; @@ -142,7 +143,6 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest { mockPayment(sourceId, 5); } - private void mockRefund(String sourceId, int sequenceId, String refundId) throws TException, IOException { when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId))) .thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,