From aa2e1d59f1c96a06f34e609ab97bba483eaef949 Mon Sep 17 00:00:00 2001 From: "k.struzhkin" Date: Mon, 19 Apr 2021 18:01:10 +0300 Subject: [PATCH] Fix after review --- .../FraudbustersMgConnectorApplication.java | 5 ++++ .../connector/listener/StartupListener.java | 7 ++++- .../listener/StreamStateManager.java | 27 +++++++++++++------ ...raudbustersMgConnectorApplicationTest.java | 6 ++--- 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java index 848d839..a2d7511 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplication.java @@ -1,5 +1,6 @@ package com.rbkmoney.fraudbusters.mg.connector; +import com.rbkmoney.fraudbusters.mg.connector.listener.StreamStateManager; import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; @@ -21,12 +22,16 @@ public class FraudbustersMgConnectorApplication extends SpringApplication { @Autowired private EventSinkStreamsPool eventSinkStreamsPool; + @Autowired + private StreamStateManager streamStateManager; + public static void main(String[] args) { SpringApplication.run(FraudbustersMgConnectorApplication.class, args); } @PreDestroy public void preDestroy() { + streamStateManager.stop(); eventSinkStreamsPool.cleanAll(); kafkaListenerEndpointRegistry.stop(); } diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java index f86a51c..3a26b32 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StartupListener.java @@ -5,11 +5,13 @@ import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.Duration; import java.util.List; @Slf4j @@ -20,6 +22,9 @@ public class StartupListener implements ApplicationListener eventSinkFactories; private final EventSinkStreamsPool eventSinkStreamsPool; + @Value("${kafka.stream.clean-timeout-sec}") + private Long cleanTimeoutSec; + @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (!CollectionUtils.isEmpty(eventSinkFactories)) { @@ -30,7 +35,7 @@ public class StartupListener implements ApplicationListener kafkaStreams.close(Duration.ofSeconds(cleanTimeoutSec)))); eventSinkStreamsPool.put(eventSinkFactory.getType(), kafkaStreams); log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata()); } diff --git a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StreamStateManager.java b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StreamStateManager.java index 5fbab85..2794602 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StreamStateManager.java +++ b/src/main/java/com/rbkmoney/fraudbusters/mg/connector/listener/StreamStateManager.java @@ -11,6 +11,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Component @@ -19,25 +20,35 @@ public class StreamStateManager { public static final int FATAL_ERROR_CODE_IN_STREAM = 228; + private AtomicBoolean isStreamRunning = new AtomicBoolean(true); + private final EventSinkStreamsPool eventSinkStreamsPool; private final ShutdownManager shutdownManager; private final List eventSinkFactories; - @Scheduled(fixedRateString = "${kafka.stream.fixed-rate-timeout-ms}") + @Scheduled(fixedDelayString = "${kafka.stream.fixed-rate-timeout-ms}") public void monitorStateOfStreams() { - try { - eventSinkFactories.forEach(this::createStreamIfShutdown); - } catch (Exception e) { - log.error("Error in monitor shutdown streams. {}", eventSinkStreamsPool, e); - shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM); + if (isStreamRunning.get()) { + try { + eventSinkFactories.forEach(this::createStreamIfShutdown); + } catch (Exception e) { + log.error("Error in monitor shutdown streams. {}", eventSinkStreamsPool, e); + shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM); + } } } + public void stop(){ + isStreamRunning.set(false); + } + private void createStreamIfShutdown(EventSinkFactory eventSinkFactory) { StreamType streamType = eventSinkFactory.getType(); KafkaStreams kafkaStreams = eventSinkStreamsPool.get(streamType); - if (kafkaStreams != null && kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN) { - eventSinkStreamsPool.put(streamType, eventSinkFactory.create()); + if (kafkaStreams != null && (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING)) { + KafkaStreams kafkaStreamsNew = eventSinkFactory.create(); + kafkaStreamsNew.start(); + eventSinkStreamsPool.put(streamType, kafkaStreamsNew); log.info("Kafka stream streamType: {} state: {}", streamType, kafkaStreams.state()); } } diff --git a/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java b/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java index cf0a596..dd57a59 100644 --- a/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java +++ b/src/test/java/com/rbkmoney/fraudbusters/mg/connector/FraudbustersMgConnectorApplicationTest.java @@ -44,14 +44,14 @@ import static org.mockito.Mockito.when; @SpringBootTest(classes = FraudbustersMgConnectorApplication.class, properties = { "stream.withdrawal.debug=false", - "kafka.stream.retries-attempts=2", + "kafka.stream.retries-attempts=1", "kafka.stream.retries-backoff-ms=100", - "kafka.stream.fixed-rate-timeout-ms=200" + "kafka.stream.fixed-rate-timeout-ms=100" }) public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest { public static final String SOURCE_ID = "source_id"; - public static final long TIMEOUT = 3000L; + public static final long TIMEOUT = 20000L; @MockBean InvoicingSrv.Iface invoicingClient;