Fix problem with dead stream trheads

This commit is contained in:
k.struzhkin 2021-04-16 11:44:34 +03:00
parent cb902dfa13
commit 1a4c1a3522
10 changed files with 65 additions and 18 deletions

View File

@ -6,9 +6,11 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PreDestroy;
@EnableScheduling
@ServletComponentScan
@SpringBootApplication
public class FraudbustersMgConnectorApplication extends SpringApplication {

View File

@ -2,6 +2,7 @@ package com.rbkmoney.fraudbusters.mg.connector.config;
import com.rbkmoney.fraudbusters.mg.connector.serde.MachineEventSerde;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
@ -22,6 +23,7 @@ public class KafkaConfig {
private static final String APP_ID = "fraud-connector";
private static final String CLIENT_ID = "fraud-connector-client";
private static final String PKCS_12 = "PKCS12";
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.ssl.server-password}")
@ -42,6 +44,8 @@ public class KafkaConfig {
private int retriesAttempts;
@Value("${kafka.stream.retries-backoff-ms}")
private int retriesBackoffMs;
@Value("${kafka.stream.default-api-timeout-ms}")
private int defaultApiTimeoutMs;
@Bean
public Properties mgInvoiceEventStreamProperties() {
@ -76,6 +80,7 @@ public class KafkaConfig {
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, retriesBackoffMs);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs);
}
private Map<String, Object> sslConfigure() {

View File

@ -10,22 +10,22 @@ import org.springframework.retry.support.RetryTemplate;
@Configuration
public class RetryConfig {
@Value("${retry.timeout:1000}")
private long timeout;
@Value("${kafka.stream.retries-attempts}")
private int retriesAttempts;
@Value("${retry.max.attempts:3}")
private int maxAttempts;
@Value("${kafka.stream.retries-backoff-ms}")
private int retriesBackoffMs;
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(timeout);
fixedBackOffPolicy.setBackOffPeriod(retriesBackoffMs);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(maxAttempts);
retryPolicy.setMaxAttempts(retriesAttempts);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;

View File

@ -77,7 +77,10 @@ public class MgEventSinkInvoiceToFraudStreamFactory implements EventSinkFactory
branch[0].mapValues(mgEventWrapper ->
retryTemplate.execute(args ->
paymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent())))
{
log.info("Rerty");
return paymentMapper.map(mgEventWrapper.getChange(), mgEventWrapper.getEvent());
}))
.to(paymentTopic, Produced.with(Serdes.String(), paymentSerde));
branch[1].mapValues(mgEventWrapper ->

View File

@ -2,7 +2,6 @@ package com.rbkmoney.fraudbusters.mg.connector.listener;
import com.rbkmoney.fraudbusters.mg.connector.factory.EventSinkFactory;
import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import com.rbkmoney.fraudbusters.mg.connector.utils.ShutdownManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
@ -18,10 +17,8 @@ import java.util.List;
@RequiredArgsConstructor
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {
public static final int FATAL_ERROR_CODE_IN_STREAM = 228;
private final List<EventSinkFactory> eventSinkFactories;
private final EventSinkStreamsPool eventSinkStreamsPool;
private final ShutdownManager shutdownManager;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
@ -40,7 +37,7 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
private void handleCriticalError(Thread t, Throwable e) {
log.error("Unhandled exception in " + t.getName() + ", exiting. {}", eventSinkStreamsPool, e);
shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM);
eventSinkStreamsPool.clean();
}
}

View File

@ -0,0 +1,30 @@
package com.rbkmoney.fraudbusters.mg.connector.listener;
import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import com.rbkmoney.fraudbusters.mg.connector.utils.ShutdownManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class StreamStateManager {
public static final int FATAL_ERROR_CODE_IN_STREAM = 228;
private final EventSinkStreamsPool eventSinkStreamsPool;
private final ShutdownManager shutdownManager;
@Scheduled(fixedRate = 60000)
public void monitorStateOfStreams() {
try {
eventSinkStreamsPool.restartAllIfShutdown();
} catch (Exception e) {
log.error("Error in monitor shutdown streams. {}", eventSinkStreamsPool, e);
shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM);
}
}
}

View File

@ -2,6 +2,7 @@ package com.rbkmoney.fraudbusters.mg.connector.pool;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.ArrayList;
@ -17,10 +18,16 @@ public class EventSinkStreamsPool {
kafkaStreamsList.add(kafkaStreams);
}
public void restartAllIfShutdown() {
if (!CollectionUtils.isEmpty(kafkaStreamsList)) {
kafkaStreamsList.stream()
.filter(kafkaStreams -> kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN)
.forEach(KafkaStreams::start);
}
}
public void clean() {
kafkaStreamsList.forEach(kafkaStreams -> {
kafkaStreams.close(Duration.ofSeconds(5L));
});
kafkaStreamsList.forEach(kafkaStreams -> kafkaStreams.close(Duration.ofSeconds(5L)));
}
}

View File

@ -4,10 +4,12 @@ package com.rbkmoney.fraudbusters.mg.connector.service;
import com.rbkmoney.damsel.payment_processing.*;
import com.rbkmoney.fraudbusters.mg.connector.domain.InvoicePaymentWrapper;
import com.rbkmoney.fraudbusters.mg.connector.exception.PaymentInfoNotFoundException;
import com.rbkmoney.fraudbusters.mg.connector.exception.PaymentInfoRequestException;
import com.rbkmoney.fraudbusters.mg.connector.factory.EventRangeFactory;
import com.rbkmoney.woody.api.flow.error.WUnavailableResultException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
@ -59,10 +61,10 @@ public class HgClientService {
throw new PaymentInfoNotFoundException("Not found payment in invoice!");
});
return invoicePaymentWrapper;
} catch (TException e) {
} catch (WUnavailableResultException | TException e) {
log.error("Error when HgClientService getInvoiceInfo invoiceId: {} eventId: {} sequenceId: {} e: ",
invoiceId, eventId, sequenceId, e);
throw new PaymentInfoRequestException(e);
throw new RetriableException(e);
}
}
}

View File

@ -55,6 +55,7 @@ kafka:
stream:
retries-attempts: 300
retries-backoff-ms: 1000
default-api-timeout-ms: 300000
ssl:
enable: false
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12

View File

@ -20,6 +20,7 @@ import com.rbkmoney.machinegun.eventsink.SinkEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -142,7 +143,6 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
mockPayment(sourceId, 5);
}
private void mockRefund(String sourceId, int sequenceId, String refundId) throws TException, IOException {
when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId)))
.thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,