Merge pull request #15 from valitydev/ft/kafka-config

new kafka config + iam support
This commit is contained in:
Egor Cherniak 2022-09-07 13:41:01 +03:00 committed by GitHub
commit d578032110
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 44 additions and 111 deletions

2
.github/settings.yml vendored Normal file
View File

@ -0,0 +1,2 @@
# These settings are synced to GitHub by https://probot.github.io/apps/settings/
_extends: .github

View File

@ -3,12 +3,7 @@ name: Vality basic linters
on: on:
pull_request: pull_request:
branches: branches:
- master - "*"
- main
push:
branches:
- master
- main
jobs: jobs:
lint: lint:

1
CODEOWNERS Normal file
View File

@ -0,0 +1 @@
* @valitydev/java

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId> <artifactId>service-parent-pom</artifactId>
<version>1.0.9</version> <version>1.0.18</version>
</parent> </parent>
<artifactId>deanonimus</artifactId> <artifactId>deanonimus</artifactId>
@ -136,6 +136,10 @@
<artifactId>elasticsearch-rest-high-level-client</artifactId> <artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version> <version>7.12.1</version>
</dependency> </dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
</dependency>
<!--test--> <!--test-->
<dependency> <dependency>

6
renovate.json Normal file
View File

@ -0,0 +1,6 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"local>valitydev/.github:renovate-config"
]
}

View File

@ -1,102 +1,56 @@
package dev.vality.deanonimus.config; package dev.vality.deanonimus.config;
import dev.vality.deanonimus.config.properties.KafkaSslProperties;
import dev.vality.deanonimus.kafka.serde.SinkEventDeserializer; import dev.vality.deanonimus.kafka.serde.SinkEventDeserializer;
import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler; import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent; import dev.vality.machinegun.eventsink.MachineEvent;
import org.apache.kafka.clients.CommonClientConfigs; import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import java.io.File;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
@RequiredArgsConstructor
@Configuration @Configuration
@EnableConfigurationProperties(KafkaSslProperties.class)
public class KafkaConfig { public class KafkaConfig {
@Value("${kafka.consumer.auto-offset-reset}") private final KafkaProperties kafkaProperties;
private String autoOffsetReset;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.topics.party-management.consumer.group-id}") @Value("${kafka.topics.party-management.consumer.group-id}")
private String partyConsumerGroup; private String partyConsumerGroup;
@Value("${kafka.client-id}")
private String clientId;
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${kafka.consumer.max-poll-interval-ms}")
private int maxPollIntervalMs;
@Value("${kafka.consumer.session-timeout-ms}")
private int sessionTimeoutMs;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.party-management.concurrency}") @Value("${kafka.consumer.party-management.concurrency}")
private int partyConcurrency; private int partyConcurrency;
@Bean @Bean
public Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) { public Map<String, Object> consumerConfigs() {
return createConsumerConfig(kafkaSslProperties, SinkEventDeserializer.class); return createConsumerConfig(SinkEventDeserializer.class);
} }
private <T> Map<String, Object> createConsumerConfig(KafkaSslProperties kafkaSslProperties, Class<T> clazz) { private <T> Map<String, Object> createConsumerConfig(Class<T> clazz) {
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
configureSsl(props, kafkaSslProperties);
return props; return props;
} }
private void configureSsl(Map<String, Object> props, KafkaSslProperties kafkaSslProperties) {
if (kafkaSslProperties.isEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
}
@Bean @Bean
public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) { public ConsumerFactory<String, MachineEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties)); return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} }
@Bean @Bean
public KafkaListenerContainerFactory public KafkaListenerContainerFactory
<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory( <ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory() {
KafkaSslProperties kafkaSslProperties) { Map<String, Object> configs = consumerConfigs();
Map<String, Object> configs = consumerConfigs(kafkaSslProperties);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup); configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs); ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, partyConcurrency); return createConcurrentFactory(consumerFactory, partyConcurrency);
@ -116,12 +70,12 @@ public class KafkaConfig {
factory.setConsumerFactory(consumerFactory); factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setBatchErrorHandler(kafkaErrorHandler()); factory.setCommonErrorHandler(kafkaErrorHandler());
factory.setConcurrency(threadsNumber); factory.setConcurrency(threadsNumber);
} }
public BatchErrorHandler kafkaErrorHandler() { public CommonErrorHandler kafkaErrorHandler() {
return new SeekToCurrentWithSleepBatchErrorHandler(); return ExponentialBackOffDefaultErrorHandlerFactory.create();
} }
} }

View File

@ -1,23 +0,0 @@
package dev.vality.deanonimus.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String trustStorePassword;
private String trustStoreLocation;
private String keyStorePassword;
private String keyPassword;
private String keyStoreLocation;
private boolean enabled;
private String keyStoreType;
private String trustStoreType;
}

View File

@ -30,6 +30,17 @@ spring:
enabled: always enabled: always
elasticsearch: elasticsearch:
uris: "http://localhost:9200" uris: "http://localhost:9200"
kafka:
bootstrap-servers: localhost:9092
client-id: deanonimus
consumer:
group-id: "DeanonimusListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
properties:
max.poll.interval.ms: 30000
session.timeout.ms: 30000
info: info:
version: '@project.version@' version: '@project.version@'
@ -40,24 +51,7 @@ data:
limit: 25 limit: 25
kafka: kafka:
bootstrap-servers: localhost:9092
client-id: deanonimus
ssl:
enabled: false
trust-store-location: "test"
trust-store-password: "test"
key-store-location: "test"
key-store-password: "test"
key-password: "test"
key-store-type: PKCS12
trust-store-type: PKCS12
consumer: consumer:
group-id: "DeanonimusListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
max-poll-interval-ms: 30000
session-timeout-ms: 30000
party-management: party-management:
concurrency: 7 concurrency: 7
topics: topics:

View File

@ -30,7 +30,7 @@ public abstract class AbstractIntegrationTest {
@DynamicPropertySource @DynamicPropertySource
static void containersProps(DynamicPropertyRegistry registry) { static void containersProps(DynamicPropertyRegistry registry) {
registry.add("kafka.bootstrap-servers", KafkaContainerExtension.KAFKA::getBootstrapServers); registry.add("spring.kafka.bootstrap-servers", KafkaContainerExtension.KAFKA::getBootstrapServers);
registry.add("spring.elasticsearch.rest.uris", () -> { registry.add("spring.elasticsearch.rest.uris", () -> {
return "http://" + return "http://" +
OpensearchContainerExtension.OPENSEARCH.getHost() + OpensearchContainerExtension.OPENSEARCH.getHost() +