add iam auth /TD-391 (#28)

* add iam auth /TD-391

* feedback fixes /TD-391
This commit is contained in:
Ivan Martynyuk 2022-09-09 11:55:47 +03:00 committed by GitHub
parent 77ddc04ee7
commit caf725b6b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 22 additions and 96 deletions

View File

@ -131,7 +131,12 @@
<artifactId>webhook-dispatcher-proto</artifactId>
<version>1.12-49f714e</version>
</dependency>
<!--Thirdparty libs-->
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -1,7 +1,6 @@
package dev.vality.hooker.configuration;
import dev.vality.damsel.payment_processing.EventPayload;
import dev.vality.hooker.configuration.properties.KafkaSslProperties;
import dev.vality.hooker.serde.SinkEventDeserializer;
import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler;
import dev.vality.kafka.common.serialization.ThriftSerializer;
@ -13,15 +12,12 @@ import dev.vality.sink.common.serialization.impl.PaymentEventPayloadDeserializer
import dev.vality.webhook.dispatcher.WebhookMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@ -32,67 +28,29 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableConfigurationProperties(KafkaSslProperties.class)
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaSslProperties kafkaSslProperties;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.client-id}")
private String clientId;
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topics.invoice.concurrency}")
private int invoicingConcurrency;
@Value("${kafka.topics.customer.concurrency}")
private int customerConcurrency;
private final KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SinkEventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
configureSsl(props);
return props;
}
private void configureSsl(Map<String, Object> props) {
if (kafkaSslProperties.isEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
}
@Bean
public ConsumerFactory<String, MachineEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
@ -150,11 +108,9 @@ public class KafkaConfig {
}
private ProducerFactory<String, WebhookMessage> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Map<String, Object> config = kafkaProperties.buildProducerProperties();
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
configureSsl(config);
return new DefaultKafkaProducerFactory<>(config);
}
}

View File

@ -1,23 +0,0 @@
package dev.vality.hooker.configuration.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String trustStorePassword;
private String trustStoreLocation;
private String keyStorePassword;
private String keyPassword;
private String keyStoreLocation;
private boolean enabled;
private String keyStoreType;
private String trustStoreType;
}

View File

@ -27,24 +27,15 @@ spring:
flyway:
schemas: hook
table: schema_version
kafka:
bootstrap-servers: enny-kafka1.bst1.rbkmoney.net:9092
client-id: hooker
consumer:
group-id: "HookerListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
kafka:
bootstrap-servers: kenny-kafka1.bst1.rbkmoney.net:9092
client-id: hooker
ssl:
enabled: false
trust-store-location: "test"
trust-store-password: "test"
key-store-location: "test"
key-store-password: "test"
key-password: "test"
key-store-type: PKCS12
trust-store-type: PKCS12
consumer:
group-id: "HookerListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
topics:
invoice:
id: mg-invoice-100-2

View File

@ -49,13 +49,10 @@ public abstract class AbstractKafkaIntegrationTest {
"flyway.url=" + postgres.getJdbcUrl(),
"flyway.user=" + postgres.getUsername(),
"flyway.password=" + postgres.getPassword()
).and("kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"kafka.ssl.enabled=false",
"kafka.consumer.group-id=TestListener",
"kafka.consumer.enable-auto-commit=false",
"kafka.consumer.auto-offset-reset=earliest",
"kafka.consumer.client-id=test",
"kafka.client-id=test",
).and("spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"spring.kafka.consumer.group-id=TestListener",
"spring.kafka.consumer.client-id=test",
"spring.kafka.client-id=test",
"kafka.topics.invoicing.enabled=true",
"kafka.topics.customer.enabled=true")
.applyTo(configurableApplicationContext);