Fix after review

This commit is contained in:
k.struzhkin 2021-04-19 18:01:10 +03:00
parent 38acbe3b8a
commit aa2e1d59f1
4 changed files with 33 additions and 12 deletions

View File

@ -1,5 +1,6 @@
package com.rbkmoney.fraudbusters.mg.connector; package com.rbkmoney.fraudbusters.mg.connector;
import com.rbkmoney.fraudbusters.mg.connector.listener.StreamStateManager;
import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool; import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
@ -21,12 +22,16 @@ public class FraudbustersMgConnectorApplication extends SpringApplication {
@Autowired @Autowired
private EventSinkStreamsPool eventSinkStreamsPool; private EventSinkStreamsPool eventSinkStreamsPool;
@Autowired
private StreamStateManager streamStateManager;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(FraudbustersMgConnectorApplication.class, args); SpringApplication.run(FraudbustersMgConnectorApplication.class, args);
} }
@PreDestroy @PreDestroy
public void preDestroy() { public void preDestroy() {
streamStateManager.stop();
eventSinkStreamsPool.cleanAll(); eventSinkStreamsPool.cleanAll();
kafkaListenerEndpointRegistry.stop(); kafkaListenerEndpointRegistry.stop();
} }

View File

@ -5,11 +5,13 @@ import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.List; import java.util.List;
@Slf4j @Slf4j
@ -20,6 +22,9 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
private final List<EventSinkFactory> eventSinkFactories; private final List<EventSinkFactory> eventSinkFactories;
private final EventSinkStreamsPool eventSinkStreamsPool; private final EventSinkStreamsPool eventSinkStreamsPool;
@Value("${kafka.stream.clean-timeout-sec}")
private Long cleanTimeoutSec;
@Override @Override
public void onApplicationEvent(ContextRefreshedEvent event) { public void onApplicationEvent(ContextRefreshedEvent event) {
if (!CollectionUtils.isEmpty(eventSinkFactories)) { if (!CollectionUtils.isEmpty(eventSinkFactories)) {
@ -30,7 +35,7 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
private void initKafkaStream(EventSinkFactory eventSinkFactory) { private void initKafkaStream(EventSinkFactory eventSinkFactory) {
KafkaStreams kafkaStreams = eventSinkFactory.create(); KafkaStreams kafkaStreams = eventSinkFactory.create();
kafkaStreams.start(); kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close(Duration.ofSeconds(cleanTimeoutSec))));
eventSinkStreamsPool.put(eventSinkFactory.getType(), kafkaStreams); eventSinkStreamsPool.put(eventSinkFactory.getType(), kafkaStreams);
log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata()); log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata());
} }

View File

@ -11,6 +11,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j @Slf4j
@Component @Component
@ -19,25 +20,35 @@ public class StreamStateManager {
public static final int FATAL_ERROR_CODE_IN_STREAM = 228; public static final int FATAL_ERROR_CODE_IN_STREAM = 228;
private AtomicBoolean isStreamRunning = new AtomicBoolean(true);
private final EventSinkStreamsPool eventSinkStreamsPool; private final EventSinkStreamsPool eventSinkStreamsPool;
private final ShutdownManager shutdownManager; private final ShutdownManager shutdownManager;
private final List<EventSinkFactory> eventSinkFactories; private final List<EventSinkFactory> eventSinkFactories;
@Scheduled(fixedRateString = "${kafka.stream.fixed-rate-timeout-ms}") @Scheduled(fixedDelayString = "${kafka.stream.fixed-rate-timeout-ms}")
public void monitorStateOfStreams() { public void monitorStateOfStreams() {
try { if (isStreamRunning.get()) {
eventSinkFactories.forEach(this::createStreamIfShutdown); try {
} catch (Exception e) { eventSinkFactories.forEach(this::createStreamIfShutdown);
log.error("Error in monitor shutdown streams. {}", eventSinkStreamsPool, e); } catch (Exception e) {
shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM); log.error("Error in monitor shutdown streams. {}", eventSinkStreamsPool, e);
shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM);
}
} }
} }
public void stop(){
isStreamRunning.set(false);
}
private void createStreamIfShutdown(EventSinkFactory eventSinkFactory) { private void createStreamIfShutdown(EventSinkFactory eventSinkFactory) {
StreamType streamType = eventSinkFactory.getType(); StreamType streamType = eventSinkFactory.getType();
KafkaStreams kafkaStreams = eventSinkStreamsPool.get(streamType); KafkaStreams kafkaStreams = eventSinkStreamsPool.get(streamType);
if (kafkaStreams != null && kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN) { if (kafkaStreams != null && (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING)) {
eventSinkStreamsPool.put(streamType, eventSinkFactory.create()); KafkaStreams kafkaStreamsNew = eventSinkFactory.create();
kafkaStreamsNew.start();
eventSinkStreamsPool.put(streamType, kafkaStreamsNew);
log.info("Kafka stream streamType: {} state: {}", streamType, kafkaStreams.state()); log.info("Kafka stream streamType: {} state: {}", streamType, kafkaStreams.state());
} }
} }

View File

@ -44,14 +44,14 @@ import static org.mockito.Mockito.when;
@SpringBootTest(classes = FraudbustersMgConnectorApplication.class, @SpringBootTest(classes = FraudbustersMgConnectorApplication.class,
properties = { properties = {
"stream.withdrawal.debug=false", "stream.withdrawal.debug=false",
"kafka.stream.retries-attempts=2", "kafka.stream.retries-attempts=1",
"kafka.stream.retries-backoff-ms=100", "kafka.stream.retries-backoff-ms=100",
"kafka.stream.fixed-rate-timeout-ms=200" "kafka.stream.fixed-rate-timeout-ms=100"
}) })
public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest { public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
public static final String SOURCE_ID = "source_id"; public static final String SOURCE_ID = "source_id";
public static final long TIMEOUT = 3000L; public static final long TIMEOUT = 20000L;
@MockBean @MockBean
InvoicingSrv.Iface invoicingClient; InvoicingSrv.Iface invoicingClient;