Merge pull request #22 from rbkmoney/ft/retry

Fix problem with dead stream threads
This commit is contained in:
Kostya 2021-04-19 19:05:28 +03:00 committed by GitHub
commit 6a1041a3a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 139 additions and 34 deletions

View File

@ -6,7 +6,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>service-parent-pom</artifactId>
<version>1.0.9</version>
<version>1.1.1</version>
</parent>
<artifactId>fraudbusters-mg-connector</artifactId>

View File

@ -1,14 +1,17 @@
package com.rbkmoney.fraudbusters.mg.connector;
import com.rbkmoney.fraudbusters.mg.connector.listener.StreamStateManager;
import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PreDestroy;
@EnableScheduling
@ServletComponentScan
@SpringBootApplication
public class FraudbustersMgConnectorApplication extends SpringApplication {
@ -19,13 +22,17 @@ public class FraudbustersMgConnectorApplication extends SpringApplication {
@Autowired
private EventSinkStreamsPool eventSinkStreamsPool;
@Autowired
private StreamStateManager streamStateManager;
public static void main(String[] args) {
SpringApplication.run(FraudbustersMgConnectorApplication.class, args);
}
@PreDestroy
public void preDestroy() {
eventSinkStreamsPool.clean();
streamStateManager.stop();
eventSinkStreamsPool.cleanAll();
kafkaListenerEndpointRegistry.stop();
}

View File

@ -2,6 +2,7 @@ package com.rbkmoney.fraudbusters.mg.connector.config;
import com.rbkmoney.fraudbusters.mg.connector.serde.MachineEventSerde;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
@ -22,6 +23,7 @@ public class KafkaConfig {
private static final String APP_ID = "fraud-connector";
private static final String CLIENT_ID = "fraud-connector-client";
private static final String PKCS_12 = "PKCS12";
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.ssl.server-password}")
@ -42,6 +44,8 @@ public class KafkaConfig {
private int retriesAttempts;
@Value("${kafka.stream.retries-backoff-ms}")
private int retriesBackoffMs;
@Value("${kafka.stream.default-api-timeout-ms}")
private int defaultApiTimeoutMs;
@Bean
public Properties mgInvoiceEventStreamProperties() {
@ -76,6 +80,7 @@ public class KafkaConfig {
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, retriesBackoffMs);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs);
}
private Map<String, Object> sslConfigure() {

View File

@ -10,22 +10,22 @@ import org.springframework.retry.support.RetryTemplate;
@Configuration
public class RetryConfig {
@Value("${retry.timeout:1000}")
private long timeout;
@Value("${kafka.stream.retries-attempts}")
private int retriesAttempts;
@Value("${retry.max.attempts:3}")
private int maxAttempts;
@Value("${kafka.stream.retries-backoff-ms}")
private int retriesBackoffMs;
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(timeout);
fixedBackOffPolicy.setBackOffPeriod(retriesBackoffMs);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(maxAttempts);
retryPolicy.setMaxAttempts(retriesAttempts);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;

View File

@ -0,0 +1,7 @@
package com.rbkmoney.fraudbusters.mg.connector.constant;
public enum StreamType {
INVOICE, WITHDRAWAL
}

View File

@ -1,9 +1,12 @@
package com.rbkmoney.fraudbusters.mg.connector.factory;
import com.rbkmoney.fraudbusters.mg.connector.constant.StreamType;
import org.apache.kafka.streams.KafkaStreams;
public interface EventSinkFactory {
StreamType getType();
KafkaStreams create();
}

View File

@ -2,6 +2,7 @@ package com.rbkmoney.fraudbusters.mg.connector.factory;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.fraudbusters.mg.connector.constant.StreamType;
import com.rbkmoney.fraudbusters.mg.connector.domain.MgEventWrapper;
import com.rbkmoney.fraudbusters.mg.connector.exception.StreamInitializationException;
import com.rbkmoney.fraudbusters.mg.connector.mapper.impl.ChargebackPaymentMapper;
@ -54,6 +55,11 @@ public class MgEventSinkInvoiceToFraudStreamFactory implements EventSinkFactory
@Value("${kafka.topic.sink.chargeback}")
private String chargebackTopic;
@Override
public StreamType getType() {
return StreamType.INVOICE;
}
@Override
public KafkaStreams create() {
try {

View File

@ -2,6 +2,7 @@ package com.rbkmoney.fraudbusters.mg.connector.factory;
import com.rbkmoney.damsel.fraudbusters.Withdrawal;
import com.rbkmoney.fistful.withdrawal.TimestampedChange;
import com.rbkmoney.fraudbusters.mg.connector.constant.StreamType;
import com.rbkmoney.fraudbusters.mg.connector.exception.StreamInitializationException;
import com.rbkmoney.fraudbusters.mg.connector.mapper.Mapper;
import com.rbkmoney.fraudbusters.mg.connector.parser.EventParser;
@ -39,6 +40,11 @@ public class MgEventSinkWithdrawalToFraudStreamFactory implements EventSinkFacto
@Value("${kafka.topic.sink.withdrawal}")
private String sink;
@Override
public StreamType getType() {
return StreamType.WITHDRAWAL;
}
@Override
public KafkaStreams create() {
try {

View File

@ -2,15 +2,16 @@ package com.rbkmoney.fraudbusters.mg.connector.listener;
import com.rbkmoney.fraudbusters.mg.connector.factory.EventSinkFactory;
import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import com.rbkmoney.fraudbusters.mg.connector.utils.ShutdownManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.List;
@Slf4j
@ -18,10 +19,11 @@ import java.util.List;
@RequiredArgsConstructor
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {
public static final int FATAL_ERROR_CODE_IN_STREAM = 228;
private final List<EventSinkFactory> eventSinkFactories;
private final EventSinkStreamsPool eventSinkStreamsPool;
private final ShutdownManager shutdownManager;
@Value("${kafka.stream.clean-timeout-sec}")
private Long cleanTimeoutSec;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
@ -32,15 +34,10 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
private void initKafkaStream(EventSinkFactory eventSinkFactory) {
KafkaStreams kafkaStreams = eventSinkFactory.create();
kafkaStreams.setUncaughtExceptionHandler(this::handleCriticalError);
kafkaStreams.start();
eventSinkStreamsPool.add(kafkaStreams);
Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close(Duration.ofSeconds(cleanTimeoutSec))));
eventSinkStreamsPool.put(eventSinkFactory.getType(), kafkaStreams);
log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.allMetadata());
}
private void handleCriticalError(Thread t, Throwable e) {
log.error("Unhandled exception in " + t.getName() + ", exiting. {}", eventSinkStreamsPool, e);
shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM);
}
}

View File

@ -0,0 +1,56 @@
package com.rbkmoney.fraudbusters.mg.connector.listener;
import com.rbkmoney.fraudbusters.mg.connector.constant.StreamType;
import com.rbkmoney.fraudbusters.mg.connector.factory.EventSinkFactory;
import com.rbkmoney.fraudbusters.mg.connector.pool.EventSinkStreamsPool;
import com.rbkmoney.fraudbusters.mg.connector.utils.ShutdownManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@RequiredArgsConstructor
public class StreamStateManager {
public static final int FATAL_ERROR_CODE_IN_STREAM = 228;
private AtomicBoolean isStreamRunning = new AtomicBoolean(true);
private final EventSinkStreamsPool eventSinkStreamsPool;
private final ShutdownManager shutdownManager;
private final List<EventSinkFactory> eventSinkFactories;
@Scheduled(fixedDelayString = "${kafka.stream.fixed-rate-timeout-ms}")
public void monitorStateOfStreams() {
if (isStreamRunning.get()) {
try {
eventSinkFactories.forEach(this::createStreamIfShutdown);
} catch (Exception e) {
log.error("Error in monitor shutdown streams. {}", eventSinkStreamsPool, e);
shutdownManager.initiateShutdown(FATAL_ERROR_CODE_IN_STREAM);
}
}
}
public void stop() {
isStreamRunning.set(false);
}
private void createStreamIfShutdown(EventSinkFactory eventSinkFactory) {
StreamType streamType = eventSinkFactory.getType();
KafkaStreams kafkaStreams = eventSinkStreamsPool.get(streamType);
if (kafkaStreams != null && (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING)) {
KafkaStreams kafkaStreamsNew = eventSinkFactory.create();
kafkaStreamsNew.start();
eventSinkStreamsPool.put(streamType, kafkaStreamsNew);
log.info("Kafka stream streamType: {} state: {}", streamType, kafkaStreams.state());
}
}
}

View File

@ -1,26 +1,34 @@
package com.rbkmoney.fraudbusters.mg.connector.pool;
import com.rbkmoney.fraudbusters.mg.connector.constant.StreamType;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class EventSinkStreamsPool {
private final List<KafkaStreams> kafkaStreamsList = Collections.synchronizedList(new ArrayList<>());
@Value("${kafka.stream.clean-timeout-sec}")
private Long cleanTimeoutSec;
public void add(KafkaStreams kafkaStreams) {
kafkaStreamsList.add(kafkaStreams);
private final Map<StreamType, KafkaStreams> kafkaStreamsList = new ConcurrentHashMap<>();
public void put(StreamType type, KafkaStreams kafkaStreams) {
kafkaStreamsList.put(type, kafkaStreams);
}
public void clean() {
kafkaStreamsList.forEach(kafkaStreams -> {
kafkaStreams.close(Duration.ofSeconds(5L));
});
public KafkaStreams get(StreamType type) {
return kafkaStreamsList.get(type);
}
public void cleanAll() {
kafkaStreamsList.forEach((key, value) -> value.close(Duration.ofSeconds(cleanTimeoutSec)));
}
}

View File

@ -4,10 +4,12 @@ package com.rbkmoney.fraudbusters.mg.connector.service;
import com.rbkmoney.damsel.payment_processing.*;
import com.rbkmoney.fraudbusters.mg.connector.domain.InvoicePaymentWrapper;
import com.rbkmoney.fraudbusters.mg.connector.exception.PaymentInfoNotFoundException;
import com.rbkmoney.fraudbusters.mg.connector.exception.PaymentInfoRequestException;
import com.rbkmoney.fraudbusters.mg.connector.factory.EventRangeFactory;
import com.rbkmoney.woody.api.flow.error.WUnavailableResultException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
@ -59,10 +61,10 @@ public class HgClientService {
throw new PaymentInfoNotFoundException("Not found payment in invoice!");
});
return invoicePaymentWrapper;
} catch (TException e) {
} catch (WUnavailableResultException | TException e) {
log.error("Error when HgClientService getInvoiceInfo invoiceId: {} eventId: {} sequenceId: {} e: ",
invoiceId, eventId, sequenceId, e);
throw new PaymentInfoRequestException(e);
throw new RetriableException(e);
}
}
}

View File

@ -55,6 +55,9 @@ kafka:
stream:
retries-attempts: 300
retries-backoff-ms: 1000
default-api-timeout-ms: 300000
clean-timeout-sec: 20
fixed-rate-timeout-ms: 60000
ssl:
enable: false
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12

View File

@ -41,11 +41,17 @@ import static org.mockito.Mockito.when;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FraudbustersMgConnectorApplication.class, properties = {"stream.withdrawal.debug=false"})
@SpringBootTest(classes = FraudbustersMgConnectorApplication.class,
properties = {
"stream.withdrawal.debug=false",
"kafka.stream.retries-attempts=1",
"kafka.stream.retries-backoff-ms=100",
"kafka.stream.fixed-rate-timeout-ms=100"
})
public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
public static final String SOURCE_ID = "source_id";
public static final long TIMEOUT = 2000L;
public static final long TIMEOUT = 20000L;
@MockBean
InvoicingSrv.Iface invoicingClient;
@ -142,7 +148,6 @@ public class FraudbustersMgConnectorApplicationTest extends KafkaAbstractTest {
mockPayment(sourceId, 5);
}
private void mockRefund(String sourceId, int sequenceId, String refundId) throws TException, IOException {
when(invoicingClient.get(HgClientService.USER_INFO, sourceId, eventRangeFactory.create(sequenceId)))
.thenReturn(BuildUtils.buildInvoice(MgEventSinkFlowGenerator.PARTY_ID, MgEventSinkFlowGenerator.SHOP_ID,