FB-3: Add dgraph log/pass props. Add kafka backoff (#16)

This commit is contained in:
Baikov Dmitrii 2022-04-22 02:56:52 -07:00 committed by GitHub
parent 43551183e0
commit 32b1bdc3a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 28 deletions

View File

@ -238,7 +238,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<version>1.17.1</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -23,6 +23,12 @@ public class KafkaStreamConfig {
private final KafkaSslProperties kafkaSslProperties;
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.reconnect-backoff-ms}")
private int reconnectBackoffMs;
@Value("${kafka.reconnect-backoff-max-ms}")
private int reconnectBackoffMaxMs;
@Value("${kafka.retry-backoff-ms}")
private int retryBackoffMs;
@Bean
public Properties rewriteStreamProperties() {
@ -34,6 +40,9 @@ public class KafkaStreamConfig {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommandSerde.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMaxMs);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
props.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class

View File

@ -5,11 +5,11 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dev.vality.kafka.common.retry.ConfigurableRetryPolicy;
import dev.vality.damsel.fraudbusters.Chargeback;
import dev.vality.damsel.fraudbusters.FraudPayment;
import dev.vality.damsel.fraudbusters.Refund;
import dev.vality.damsel.fraudbusters.Withdrawal;
import dev.vality.fraudbusters.config.properties.DgraphProperties;
import dev.vality.fraudbusters.constant.DgraphSchemaConstants;
import dev.vality.fraudbusters.converter.PaymentToDgraphPaymentConverter;
import dev.vality.fraudbusters.converter.PaymentToPaymentModelConverter;
@ -17,13 +17,12 @@ import dev.vality.fraudbusters.domain.dgraph.common.*;
import dev.vality.fraudbusters.listener.events.dgraph.*;
import dev.vality.fraudbusters.repository.Repository;
import dev.vality.fraudbusters.stream.impl.FullTemplateVisitorImpl;
import dev.vality.kafka.common.retry.ConfigurableRetryPolicy;
import io.dgraph.DgraphClient;
import io.dgraph.DgraphGrpc;
import io.dgraph.DgraphProto;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -70,24 +69,23 @@ public class DgraphConfig {
}
@Bean
public DgraphClient dgraphClient(@Value("${dgraph.host}") String host,
@Value("${dgraph.port}") int port,
@Value("${dgraph.withAuthHeader}") boolean withAuthHeader) {
public DgraphClient dgraphClient(DgraphProperties dgraphProperties) {
log.info("Connecting to the dgraph cluster");
String host = dgraphProperties.getHost();
int port = dgraphProperties.getPort();
log.info("Create dgraph client (host: {}, port: {})", host, port);
DgraphClient dgraphClient = new DgraphClient(createStub(host, port, withAuthHeader));
log.info("Dgraph client was created (host: {}, port: {})", host, port);
dgraphClient.alter(
DgraphProto.Operation.newBuilder()
.setDropAll(true)
.setSchema(DgraphSchemaConstants.SCHEMA)
.build()
);
// duplicate for syntax check
DgraphClient dgraphClient = new DgraphClient(createStub(host, port));
log.info("Dgraph version: {}", dgraphClient.checkVersion());
if (dgraphProperties.isAuth()) {
log.info("Connect to the Dgraph cluster with login and password...");
dgraphClient.login(dgraphProperties.getLogin(), dgraphProperties.getPassword());
}
dgraphClient.alter(
DgraphProto.Operation.newBuilder()
.setSchema(DgraphSchemaConstants.SCHEMA)
.build()
);
log.info("Altering of the schema was completed");
return dgraphClient;
}
@ -145,20 +143,12 @@ public class DgraphConfig {
return new DgraphWithdrawalEventListener(repository, converter);
}
private DgraphGrpc.DgraphStub createStub(String host, int port, boolean withAuthHeader) {
private DgraphGrpc.DgraphStub createStub(String host, int port) {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build();
DgraphGrpc.DgraphStub stub = DgraphGrpc.newStub(channel);
if (withAuthHeader) {
Metadata metadata = new Metadata();
metadata.put(
Metadata.Key.of("auth-token", Metadata.ASCII_STRING_MARSHALLER), "the-auth-token-value");
stub = MetadataUtils.attachHeaders(stub, metadata);
}
return stub;
return DgraphGrpc.newStub(channel);
}
private static final class RegisterJobFailListener extends RetryListenerSupport {

View File

@ -0,0 +1,18 @@
package dev.vality.fraudbusters.config.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "dgraph")
public class DgraphProperties {
private String host;
private int port;
private boolean auth;
private String login;
private String password;
}

View File

@ -50,6 +50,12 @@ public class ListenersConfigurationService {
private int listenResultConcurrency;
@Value("${kafka.dgraph.topics.payment.concurrency}")
private int dgraphPaymentConcurrency;
@Value("${kafka.reconnect-backoff-ms}")
private int reconnectBackoffMs;
@Value("${kafka.reconnect-backoff-max-ms}")
private int reconnectBackoffMaxMs;
@Value("${kafka.retry-backoff-ms}")
private int retryBackoffMs;
public Map<String, Object> createDefaultProperties(String value) {
final Map<String, Object> props = new HashMap<>();
@ -57,6 +63,9 @@ public class ListenersConfigurationService {
props.put(ConsumerConfig.GROUP_ID_CONFIG, value);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMaxMs);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
props.putAll(SslKafkaUtils.sslConfigure(kafkaSslProperties));
return props;
}

View File

@ -40,6 +40,9 @@ spring:
kafka:
bootstrap.servers: "localhost:29092"
backoff.interval: 1000
reconnect-backoff-ms: 1500
reconnect-backoff-max-ms: 3000
retry-backoff-ms: 1000
historical.listener:
enable: false
reply:
@ -118,7 +121,9 @@ clickhouse.db:
dgraph:
host: localhost
port: 9080
withAuthHeader: false
auth: false
login: "login"
password: "password"
maxAttempts: -1
service:
enabled: false

View File

@ -2,7 +2,6 @@ package dev.vality.fraudbusters.dgraph;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.vality.kafka.common.serialization.ThriftSerializer;
import dev.vality.columbus.ColumbusServiceSrv;
import dev.vality.damsel.wb_list.WbListServiceSrv;
import dev.vality.fraudbusters.FraudBustersApplication;
@ -19,9 +18,13 @@ import dev.vality.fraudbusters.repository.clickhouse.impl.PaymentRepositoryImpl;
import dev.vality.fraudbusters.service.CardPoolManagementService;
import dev.vality.fraudbusters.service.ShopManagementService;
import dev.vality.fraudbusters.util.KeyGenerator;
import dev.vality.kafka.common.serialization.ThriftSerializer;
import dev.vality.trusted.tokens.TrustedTokensSrv;
import io.dgraph.DgraphClient;
import io.dgraph.DgraphGrpc;
import io.dgraph.DgraphProto;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -69,6 +72,9 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
properties = {
"kafka.listen.result.concurrency=1",
"dgraph.service.enabled=true",
"kafka.reconnect-backoff-ms=100",
"kafka.reconnect-backoff-max-ms=100",
"kafka.retry-backoff-ms=100",
"kafka.dgraph.topics.payment.enabled=true",
"kafka.dgraph.topics.refund.enabled=true",
"kafka.dgraph.topics.fraud_payment.enabled=true",
@ -119,6 +125,8 @@ public abstract class DgraphAbstractIntegrationTest {
private static GenericContainer dgraphServer;
private static volatile boolean isDgraphStarted;
private static String testHostname = "localhost";
private static final int RETRIES_COUNT = 10;
private static final long TIMEOUT = 5000L;
@DynamicPropertySource
static void connectionConfigs(DynamicPropertyRegistry registry) {
@ -133,6 +141,35 @@ public abstract class DgraphAbstractIntegrationTest {
cleanupBeforeTermination();
isDgraphStarted = true;
}
int count = 0;
while (!clearDb() && count < RETRIES_COUNT) {
count++;
Thread.sleep(TIMEOUT);
}
}
private static boolean clearDb() {
try {
DgraphClient dgraphClient = new DgraphClient(createStub(testHostname, 9080));
dgraphClient.alter(
DgraphProto.Operation.newBuilder()
.setDropAll(true)
.build()
);
return true;
} catch (RuntimeException ex) {
log.error("Received error while the service cleaned the database", ex);
return false;
}
}
private static DgraphGrpc.DgraphStub createStub(String host, int port) {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build();
return DgraphGrpc.newStub(channel);
}
private static void cleanupBeforeTermination() {