Merge pull request #14 from valitydev/auth_kafka

TD-395: Add new kafka config
This commit is contained in:
malkoas 2022-09-09 13:46:41 +03:00 committed by GitHub
commit e2b5c4a0f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 59 additions and 158 deletions

View File

@ -52,4 +52,4 @@ ConditionTemplateRequest, содержащий название шаблона
![handler-trusted-tokens.svg](doc/handler-trusted-tokens.svg)
Протокол взаимодействия описан [тут](https://github.com/rbkmoney/trusted-tokens-proto).
Протокол взаимодействия описан [тут](https://github.com/valitydev/trusted-tokens-proto).

View File

@ -7,7 +7,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>1.0.14</version>
<version>1.0.18</version>
</parent>
<artifactId>trusted-tokens-manager</artifactId>
@ -26,7 +26,7 @@
<fraudbusters-proto.version>1.108-0800fde</fraudbusters-proto.version>
<machinegun-proto.version>1.21-e4784ab</machinegun-proto.version>
<payout-manager-proto.version>1.35-dbed280</payout-manager-proto.version>
<testcontainers-annotations.version>1.4.0</testcontainers-annotations.version>
<testcontainers-annotations.version>1.4.3</testcontainers-annotations.version>
<woody.version>1.0.0</woody.version>
</properties>
@ -121,6 +121,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
</dependency>
<!--test-->
<dependency>

View File

@ -2,17 +2,14 @@ package dev.vality.trusted.tokens.config;
import dev.vality.damsel.fraudbusters.Payment;
import dev.vality.damsel.fraudbusters.Withdrawal;
import dev.vality.trusted.tokens.config.properties.KafkaSslProperties;
import dev.vality.trusted.tokens.serde.deserializer.PaymentDeserializer;
import dev.vality.trusted.tokens.serde.deserializer.WithdrawalDeserializer;
import org.apache.kafka.clients.CommonClientConfigs;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@ -20,31 +17,18 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
@Configuration
@EnableConfigurationProperties(KafkaSslProperties.class)
@RequiredArgsConstructor
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.client-id}")
@Value("${spring.kafka.client-id}")
private String clientId;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.max-poll-interval-ms}")
private int maxPollInterval;
@Value("${kafka.consumer.max-session-timeout-ms}")
private int maxSessionTimeout;
@Value("${kafka.topics.payment.consume.max-poll-records}")
private String paymentMaxPollRecords;
@ -57,30 +41,28 @@ public class KafkaConfig {
@Value("${kafka.topics.withdrawal.consume.concurrency}")
private int withdrawalConcurrency;
private final KafkaProperties kafkaProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Payment> paymentListenerContainerFactory(
KafkaSslProperties kafkaSslProperties) {
public ConcurrentKafkaListenerContainerFactory<String, Payment> paymentListenerContainerFactory() {
var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Payment>();
configureContainerFactory(
containerFactory,
new PaymentDeserializer(),
clientId + "-payment",
paymentMaxPollRecords,
kafkaSslProperties);
paymentMaxPollRecords);
containerFactory.setConcurrency(paymentConcurrency);
return containerFactory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Withdrawal> withdrawalListenerContainerFactory(
KafkaSslProperties kafkaSslProperties) {
public ConcurrentKafkaListenerContainerFactory<String, Withdrawal> withdrawalListenerContainerFactory() {
var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Withdrawal>();
configureContainerFactory(
containerFactory,
new WithdrawalDeserializer(),
clientId + "-withdrawal",
withdrawalMaxPollRecords,
kafkaSslProperties);
withdrawalMaxPollRecords);
containerFactory.setConcurrency(withdrawalConcurrency);
return containerFactory;
}
@ -89,13 +71,11 @@ public class KafkaConfig {
ConcurrentKafkaListenerContainerFactory<String, T> containerFactory,
Deserializer<T> deserializer,
String clientId,
String maxPollRecords,
KafkaSslProperties kafkaSslProperties) {
String maxPollRecords) {
var consumerFactory = createKafkaConsumerFactory(
deserializer,
clientId,
maxPollRecords,
kafkaSslProperties);
maxPollRecords);
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
containerFactory.setBatchListener(true);
@ -105,38 +85,11 @@ public class KafkaConfig {
private <T> DefaultKafkaConsumerFactory<String, T> createKafkaConsumerFactory(
Deserializer<T> deserializer,
String clientId,
String maxPollRecords,
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> properties = defaultProperties(kafkaSslProperties);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
String maxPollRecords) {
Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), deserializer);
}
private Map<String, Object> defaultProperties(KafkaSslProperties kafkaSslProperties) {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST.name().toLowerCase());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, maxSessionTimeout);
configureSsl(properties, kafkaSslProperties);
return properties;
}
private void configureSsl(Map<String, Object> properties, KafkaSslProperties kafkaSslProperties) {
if (kafkaSslProperties.isEnabled()) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
}
}

View File

@ -1,101 +1,63 @@
package dev.vality.trusted.tokens.config;
import dev.vality.trusted.tokens.config.properties.KafkaSslProperties;
import dev.vality.trusted.tokens.serde.PaymentSerde;
import dev.vality.trusted.tokens.serde.WithdrawalSerde;
import org.apache.kafka.clients.CommonClientConfigs;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
@EnableConfigurationProperties(KafkaSslProperties.class)
@RequiredArgsConstructor
public class KafkaStreamsConfig {
public static final String PAYMENT_SUFFIX = "-payment";
public static final String WITHDRAWAL_SUFFIX = "-withdrawal";
private static final String APP_ID = "trusted-tokens";
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.client-id}")
@Value("${spring.kafka.client-id}")
private String clientId;
@Value("${kafka.num-stream-threads}")
private int numStreamThreads;
@Value("${kafka.stream.retries-attempts}")
private int retriesAttempts;
@Value("${kafka.stream.retries-backoff-ms}")
private int retriesBackoffMs;
@Value("${kafka.stream.default-api-timeout-ms}")
private int defaultApiTimeoutMs;
private final KafkaProperties kafkaProperties;
@Bean
public Properties paymentEventStreamProperties(KafkaSslProperties kafkaSslProperties) {
final Properties props = new Properties();
public Properties paymentEventStreamProperties() {
final Map<String, Object> props = kafkaProperties.buildStreamsProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + PAYMENT_SUFFIX);
props.put(StreamsConfig.CLIENT_ID_CONFIG, clientId + PAYMENT_SUFFIX);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PaymentSerde.class);
addDefaultStreamsProperties(props);
props.putAll(configureSsl(kafkaSslProperties));
return props;
}
private Map<String, Object> configureSsl(KafkaSslProperties kafkaSslProperties) {
Map<String, Object> properties = new HashMap<>();
if (kafkaSslProperties.isEnabled()) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
var properties = new Properties();
properties.putAll(props);
return properties;
}
@Bean
public Properties withdrawalEventStreamProperties(KafkaSslProperties kafkaSslProperties) {
final Properties props = new Properties();
public Properties withdrawalEventStreamProperties() {
final Map<String, Object> props = kafkaProperties.buildStreamsProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + WITHDRAWAL_SUFFIX);
props.put(StreamsConfig.CLIENT_ID_CONFIG, clientId + WITHDRAWAL_SUFFIX);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, WithdrawalSerde.class);
addDefaultStreamsProperties(props);
props.putAll(configureSsl(kafkaSslProperties));
return props;
var properties = new Properties();
properties.putAll(props);
return properties;
}
private void addDefaultStreamsProperties(Properties props) {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
private void addDefaultStreamsProperties(Map<String, Object> props) {
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
props.put(StreamsConfig.RETRIES_CONFIG, retriesAttempts);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, retriesBackoffMs);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs);
}
}

View File

@ -1,23 +0,0 @@
package dev.vality.trusted.tokens.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String trustStorePassword;
private String trustStoreLocation;
private String keyStorePassword;
private String keyPassword;
private String keyStoreLocation;
private boolean enabled;
private String keyStoreType;
private String trustStoreType;
}

View File

@ -12,7 +12,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class EventStreamsPool {
@Value("${kafka.stream.clean-timeout-sec}")
@Value("${spring.kafka.streams.properties.clean.timeout.sec}")
private Long cleanTimeoutSec;
private final Map<StreamType, KafkaStreams> kafkaStreamsList = new ConcurrentHashMap<>();

View File

@ -22,7 +22,7 @@ public class KafkaStreamsStartupInitializer {
private final List<EventFactory> eventFactories;
private final EventStreamsPool eventStreamsPool;
@Value("${kafka.stream.clean-timeout-sec}")
@Value("${spring.kafka.streams.properties.clean.timeout.sec}")
private Long cleanTimeoutSec;
@EventListener(value = ContextRefreshedEvent.class)

View File

@ -22,7 +22,7 @@ public class PaymentKafkaListener {
private final PaymentService paymentService;
@Value("${kafka.consumer.throttling-timeout-ms}")
@Value("${spring.kafka.consumer.properties.throttling.timeout.ms}")
private int throttlingTimeout;
@KafkaListener(

View File

@ -22,7 +22,7 @@ public class WithdrawalKafkaListener {
private final WithdrawalService withdrawalService;
@Value("${kafka.consumer.throttling-timeout-ms}")
@Value("${spring.kafka.consumer.properties.throttling.timeout.ms}")
private int throttlingTimeout;
@KafkaListener(

View File

@ -31,6 +31,25 @@ spring:
output:
ansi:
enabled: always
kafka:
bootstrap-servers: kenny-kafka1.bst1.rbkmoney.net:9092
client-id: trusted-tokens-manager
streams:
properties:
retries: 300
retry.backoff.ms: 1000
num.stream.threads: 7
default.api.timeout.ms: 300000
clean.timeout.sec: 20
consumer:
group-id: trusted-tokens-group-1
enable-auto-commit: false
auto-offset-reset: earliest
properties:
max.poll.interval.ms: 60000
session.timeout.ms: 60000
throttling.timeout.ms: 1000
info:
version: '@project.version@'
stage: dev
@ -43,20 +62,6 @@ riak-config:
template: template
kafka:
bootstrap-servers: kenny-kafka1.bst1.rbkmoney.net:9092
client-id: trusted-tokens-manager
num-stream-threads: 7
stream:
retries-attempts: 300
retries-backoff-ms: 1000
default-api-timeout-ms: 300000
clean-timeout-sec: 20
fixed-rate-timeout-ms: 60000
consumer:
group-id: trusted-tokens-group-1
max-poll-interval-ms: 60000
max-session-timeout-ms: 60000
throttling-timeout-ms: 1000
topics:
payment:
id: payment_event