new kafka config + iam support

This commit is contained in:
Egor Cherniak 2022-07-20 18:12:14 +03:00
parent 38cbd4a691
commit 9a988b35d0
No known key found for this signature in database
GPG Key ID: 26F47333B7BE4ED9
5 changed files with 35 additions and 105 deletions

View File

@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>1.0.9</version>
<version>1.0.18</version>
</parent>
<artifactId>deanonimus</artifactId>
@ -136,6 +136,10 @@
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
</dependency>
<!--test-->
<dependency>

View File

@ -1,102 +1,57 @@
package dev.vality.deanonimus.config;
import dev.vality.deanonimus.config.properties.KafkaSslProperties;
import dev.vality.deanonimus.kafka.serde.SinkEventDeserializer;
import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler;
import dev.vality.kafka.common.exception.handler.ExponentialBackOffDefaultErrorHandler;
import dev.vality.machinegun.eventsink.MachineEvent;
import org.apache.kafka.clients.CommonClientConfigs;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.util.backoff.ExponentialBackOff;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@Configuration
@EnableConfigurationProperties(KafkaSslProperties.class)
public class KafkaConfig {
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.group-id}")
private String groupId;
private final KafkaProperties kafkaProperties;
@Value("${kafka.topics.party-management.consumer.group-id}")
private String partyConsumerGroup;
@Value("${kafka.client-id}")
private String clientId;
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${kafka.consumer.max-poll-interval-ms}")
private int maxPollIntervalMs;
@Value("${kafka.consumer.session-timeout-ms}")
private int sessionTimeoutMs;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.party-management.concurrency}")
private int partyConcurrency;
@Bean
public Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) {
return createConsumerConfig(kafkaSslProperties, SinkEventDeserializer.class);
public Map<String, Object> consumerConfigs() {
return createConsumerConfig(SinkEventDeserializer.class);
}
private <T> Map<String, Object> createConsumerConfig(KafkaSslProperties kafkaSslProperties, Class<T> clazz) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
private <T> Map<String, Object> createConsumerConfig(Class<T> clazz) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
configureSsl(props, kafkaSslProperties);
return props;
}
private void configureSsl(Map<String, Object> props, KafkaSslProperties kafkaSslProperties) {
if (kafkaSslProperties.isEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
}
@Bean
public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties));
public ConsumerFactory<String, MachineEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory
<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory(
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configs = consumerConfigs(kafkaSslProperties);
<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory() {
Map<String, Object> configs = consumerConfigs();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, partyConcurrency);
@ -116,12 +71,12 @@ public class KafkaConfig {
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setBatchErrorHandler(kafkaErrorHandler());
factory.setCommonErrorHandler(kafkaErrorHandler());
factory.setConcurrency(threadsNumber);
}
public BatchErrorHandler kafkaErrorHandler() {
return new SeekToCurrentWithSleepBatchErrorHandler();
public CommonErrorHandler kafkaErrorHandler() {
return new ExponentialBackOffDefaultErrorHandler(new ExponentialBackOff());
}
}

View File

@ -1,23 +0,0 @@
package dev.vality.deanonimus.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String trustStorePassword;
private String trustStoreLocation;
private String keyStorePassword;
private String keyPassword;
private String keyStoreLocation;
private boolean enabled;
private String keyStoreType;
private String trustStoreType;
}

View File

@ -30,6 +30,17 @@ spring:
enabled: always
elasticsearch:
uris: "http://localhost:9200"
kafka:
bootstrap-servers: localhost:9092
client-id: deanonimus
consumer:
group-id: "DeanonimusListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
properties:
max.poll.interval.ms: 30000
session.timeout.ms: 30000
info:
version: '@project.version@'
@ -40,24 +51,7 @@ data:
limit: 25
kafka:
bootstrap-servers: localhost:9092
client-id: deanonimus
ssl:
enabled: false
trust-store-location: "test"
trust-store-password: "test"
key-store-location: "test"
key-store-password: "test"
key-password: "test"
key-store-type: PKCS12
trust-store-type: PKCS12
consumer:
group-id: "DeanonimusListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
max-poll-interval-ms: 30000
session-timeout-ms: 30000
party-management:
concurrency: 7
topics:

View File

@ -30,7 +30,7 @@ public abstract class AbstractIntegrationTest {
@DynamicPropertySource
static void containersProps(DynamicPropertyRegistry registry) {
registry.add("kafka.bootstrap-servers", KafkaContainerExtension.KAFKA::getBootstrapServers);
registry.add("spring.kafka.bootstrap-servers", KafkaContainerExtension.KAFKA::getBootstrapServers);
registry.add("spring.elasticsearch.rest.uris", () -> {
return "http://" +
OpensearchContainerExtension.OPENSEARCH.getHost() +