Alternative kafka configuration example

This commit is contained in:
Egor Cherniak 2022-07-13 15:28:31 +03:00
parent d82e22faf5
commit 82ad21f212
No known key found for this signature in database
GPG Key ID: 26F47333B7BE4ED9
8 changed files with 45 additions and 142 deletions

View File

@ -7,7 +7,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>1.0.16</version>
<version>1.0.18</version>
</parent>
<name>fraudbusters</name>
@ -89,7 +89,6 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>

View File

@ -1,17 +1,16 @@
package dev.vality.fraudbusters.config;
import dev.vality.fraudbusters.config.properties.KafkaSslProperties;
import dev.vality.fraudbusters.serde.CommandSerde;
import dev.vality.fraudbusters.service.ConsumerGroupIdService;
import dev.vality.fraudbusters.util.SslKafkaUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
import java.util.Properties;
@Configuration
@ -20,35 +19,22 @@ public class KafkaStreamConfig {
public static final String SENDER = "sender";
private final ConsumerGroupIdService consumerGroupIdService;
private final KafkaSslProperties kafkaSslProperties;
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.reconnect-backoff-ms}")
private int reconnectBackoffMs;
@Value("${kafka.reconnect-backoff-max-ms}")
private int reconnectBackoffMaxMs;
@Value("${kafka.retry-backoff-ms}")
private int retryBackoffMs;
@Bean
public Properties rewriteStreamProperties() {
final Properties props = new Properties();
public Properties rewriteStreamProperties(KafkaProperties kafkaProperties) {
final Map<String, Object> props = kafkaProperties.buildStreamsProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroupIdService.generateGroupId(SENDER));
props.put(StreamsConfig.CLIENT_ID_CONFIG, consumerGroupIdService.generateGroupId(SENDER));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommandSerde.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMaxMs);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
props.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class
);
props.putAll(SslKafkaUtils.sslConfigure(kafkaSslProperties));
return props;
var properties = new Properties();
properties.putAll(props);
return properties;
}
}

View File

@ -8,6 +8,7 @@ import dev.vality.fraudbusters.domain.FraudResult;
import dev.vality.fraudbusters.serde.*;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

View File

@ -1,22 +0,0 @@
package dev.vality.fraudbusters.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String serverPassword;
private String serverKeystoreLocation;
private String keystorePassword;
private String keyPassword;
private String keystoreLocation;
private boolean enable;
}

View File

@ -2,45 +2,35 @@ package dev.vality.fraudbusters.config.service;
import dev.vality.kafka.common.serialization.ThriftSerializer;
import dev.vality.damsel.fraudbusters.ReferenceInfo;
import dev.vality.fraudbusters.config.properties.KafkaSslProperties;
import dev.vality.fraudbusters.util.SslKafkaUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@RequiredArgsConstructor
public class KafkaTemplateConfigurationService {
private final KafkaSslProperties kafkaSslProperties;
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
private final KafkaProperties kafkaProperties;
public Map<String, Object> producerJsonConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Map<String, Object> props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.putAll(SslKafkaUtils.sslConfigure(kafkaSslProperties));
return props;
}
public Map<String, Object> producerThriftConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Map<String, Object> props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
props.putAll(SslKafkaUtils.sslConfigure(kafkaSslProperties));
return props;
}

View File

@ -2,15 +2,14 @@ package dev.vality.fraudbusters.config.service;
import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler;
import dev.vality.damsel.fraudbusters.Command;
import dev.vality.fraudbusters.config.properties.KafkaSslProperties;
import dev.vality.fraudbusters.serde.CommandDeserializer;
import dev.vality.fraudbusters.service.ConsumerGroupIdService;
import dev.vality.fraudbusters.util.SslKafkaUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@ -23,7 +22,6 @@ import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@ -33,10 +31,8 @@ public class ListenersConfigurationService {
public static final long THROTTLING_TIMEOUT = 500L;
public static final int MAX_WAIT_FETCH_MS = 7000;
private static final String EARLIEST = "earliest";
private final ConsumerGroupIdService consumerGroupIdService;
private final KafkaSslProperties kafkaSslProperties;
private final KafkaProperties kafkaProperties;
@Value("${kafka.max.poll.records}")
private String maxPollRecords;
@ -44,31 +40,10 @@ public class ListenersConfigurationService {
private int maxRetryAttempts;
@Value("${kafka.backoff.interval}")
private int backoffInterval;
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.listen.result.concurrency}")
private int listenResultConcurrency;
@Value("${kafka.dgraph.topics.payment.concurrency}")
private int dgraphPaymentConcurrency;
@Value("${kafka.reconnect-backoff-ms}")
private int reconnectBackoffMs;
@Value("${kafka.reconnect-backoff-max-ms}")
private int reconnectBackoffMaxMs;
@Value("${kafka.retry-backoff-ms}")
private int retryBackoffMs;
public Map<String, Object> createDefaultProperties(String value) {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, value);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMaxMs);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
props.putAll(SslKafkaUtils.sslConfigure(kafkaSslProperties));
return props;
}
public ConcurrentKafkaListenerContainerFactory<String, Command> createDefaultFactory(
ConsumerFactory<String, Command> stringCommandConsumerFactory) {
@ -108,7 +83,8 @@ public class ListenersConfigurationService {
Deserializer<T> deserializer,
String groupId) {
String consumerGroup = consumerGroupIdService.generateGroupId(groupId);
final Map<String, Object> props = createDefaultProperties(consumerGroup);
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
return createFactoryWithProps(deserializer, props);
}
@ -117,7 +93,8 @@ public class ListenersConfigurationService {
String groupId,
Integer fetchMinBytes) {
String consumerGroup = consumerGroupIdService.generateGroupId(groupId);
final Map<String, Object> props = createDefaultProperties(consumerGroup);
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, MAX_WAIT_FETCH_MS);
return createFactoryWithProps(deserializer, props);
@ -128,7 +105,8 @@ public class ListenersConfigurationService {
String groupId,
Integer fetchMinBytes) {
String consumerGroup = consumerGroupIdService.generateGroupId(groupId);
final Map<String, Object> props = createDefaultProperties(consumerGroup);
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, MAX_WAIT_FETCH_MS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@ -157,7 +135,8 @@ public class ListenersConfigurationService {
public ConsumerFactory<String, Command> createDefaultConsumerFactory(String groupListReferenceGroupId) {
String value = consumerGroupIdService.generateRandomGroupId(groupListReferenceGroupId);
final Map<String, Object> props = createDefaultProperties(value);
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, value);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CommandDeserializer());
}

View File

@ -1,37 +0,0 @@
package dev.vality.fraudbusters.util;
import dev.vality.fraudbusters.config.properties.KafkaSslProperties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
public class SslKafkaUtils {
private static final String PKCS_12 = "PKCS12";
private static final String SSL = "SSL";
public static Map<String, Object> sslConfigure(KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configProps = new HashMap<>();
if (kafkaSslProperties.isEnable()) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL);
configProps.put(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getServerKeystoreLocation()).getAbsolutePath()
);
configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getServerPassword());
configProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, PKCS_12);
configProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PKCS_12);
configProps.put(
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeystoreLocation()).getAbsolutePath()
);
configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeystorePassword());
configProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
return configProps;
}
}

View File

@ -27,6 +27,27 @@ management:
exposure:
include: health,info,prometheus
spring:
kafka:
bootstrap.servers: "localhost:29092"
properties:
backoff.interval: 1000
reconnect.backoff.ms: 1500
reconnect.backoff.max.ms: 3000
retry.backoff.ms: 1000
consumer:
auto-offset-reset: earliest
enable-auto-commit: true
streams:
state-dir: tmp/state-store/
properties:
commit.interval.ms: 10000
cache.max.bytes.buffering: 0
ssl:
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12
keystore-password: kenny
key-password: kenny
trust-store-password: kenny12
trust-store-location: src/main/resources/cert/truststore.p12
application:
name: '@name@'
output:
@ -38,11 +59,6 @@ spring:
spec: maximumSize=500,expireAfterAccess=100s
kafka:
bootstrap.servers: "localhost:29092"
backoff.interval: 1000
reconnect-backoff-ms: 1500
reconnect-backoff-max-ms: 3000
retry-backoff-ms: 1000
historical.listener:
enable: false
reply:
@ -97,15 +113,6 @@ kafka:
name: withdrawal
concurrency: 1
enabled: false
ssl:
enable: false
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12
keystore-password: kenny
key-password: kenny
server-password: kenny12
server-keystore-location: src/main/resources/cert/truststore.p12
state:
dir: tmp/state-store/
aggr.payment.min.bytes: 50000
clickhouse.db: