revert KafkaConfig changes (#43)

This commit is contained in:
Anatoly Karlov 2022-04-08 15:18:58 +07:00 committed by GitHub
parent ad9a772d2f
commit 4726fbc5fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 97 deletions

View File

@ -7,4 +7,4 @@ on:
jobs:
build:
uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v2-beta
uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1

View File

@ -12,7 +12,7 @@ env:
jobs:
deploy:
uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v2-beta
uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1
secrets:
github-token: ${{ secrets.GITHUB_TOKEN }}
mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>2.0.0-BETA-8</version>
<version>1.0.14</version>
</parent>
<artifactId>newway</artifactId>
@ -62,6 +62,11 @@
</dependency>
<!--Thrirdparty libs-->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>

View File

@ -1,6 +1,6 @@
package dev.vality.newway.config;
import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.config.properties.KafkaConsumerProperties;
import dev.vality.newway.config.properties.KafkaSslProperties;
@ -18,8 +18,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.io.File;
@ -29,6 +31,7 @@ import java.util.Map;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(KafkaSslProperties.class)
@SuppressWarnings("LineLength")
public class KafkaConfig {
private final KafkaConsumerProperties kafkaConsumerProperties;
@ -41,90 +44,11 @@ public class KafkaConfig {
private String bootstrapServers;
@Bean
public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties));
public Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) {
return createConsumerConfig(kafkaSslProperties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getInvoicingConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> recPayToolContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRecurrentPaymentToolConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> rateContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRateConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> depositContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDepositConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> identityContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getIdentityConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> walletContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWalletConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> withdrawalContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> payoutContainerFactory(
KafkaSslProperties kafkaSslProperties) {
var kafkaConsumerFactory = new DefaultKafkaConsumerFactory<String, Event>(consumerConfigs(kafkaSslProperties));
kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer());
var factory = new ConcurrentKafkaListenerContainerFactory<String, Event>();
initFactory(kafkaConsumerFactory, kafkaConsumerProperties.getPayoutConcurrency(), factory);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> sourceContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getSourceConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> destinationContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDestinationConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> withdrawalSessionContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalSessionConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> partyManagementContainerFactory(
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configs = consumerConfigs(kafkaSslProperties);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency());
}
private Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) {
private Map<String, Object> createConsumerConfig(KafkaSslProperties kafkaSslProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@ -155,24 +79,107 @@ public class KafkaConfig {
}
}
private ConcurrentKafkaListenerContainerFactory<String, MachineEvent> createConcurrentFactory(
ConsumerFactory<String, MachineEvent> consumerFactory,
int threadsNumber) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, MachineEvent>();
@Bean
public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> kafkaListenerContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getInvoicingConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> recPayToolContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRecurrentPaymentToolConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> rateContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRateConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> depositContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDepositConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> identityContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getIdentityConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> walletContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWalletConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> withdrawalContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> payoutContainerFactory(
KafkaSslProperties kafkaSslProperties) {
DefaultKafkaConsumerFactory<String, Event> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(createConsumerConfig(kafkaSslProperties));
kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer());
ConcurrentKafkaListenerContainerFactory<String, Event> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(kafkaConsumerFactory, kafkaConsumerProperties.getPayoutConcurrency(), factory);
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> sourceContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getSourceConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> destinationContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDestinationConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> withdrawalSessionContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalSessionConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory(
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configs = createConsumerConfig(kafkaSslProperties);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency());
}
private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> createConcurrentFactory(
ConsumerFactory<String, MachineEvent> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(consumerFactory, threadsNumber, factory);
return factory;
}
private <T> void initFactory(
ConsumerFactory<String, T> consumerFactory,
private <T> void initFactory(ConsumerFactory<String, T> consumerFactory,
int threadsNumber,
ConcurrentKafkaListenerContainerFactory<String, T> factory) {
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// todo timeout at deploy when replace BatchErrorHandler on CommonErrorHandler
// factory.setCommonErrorHandler(ExponentialBackOffDefaultErrorHandlerFactory.create());
factory.setBatchErrorHandler(new SeekToCurrentWithSleepBatchErrorHandler());
factory.setCommonErrorHandler(ExponentialBackOffDefaultErrorHandlerFactory.create());
factory.setConcurrency(threadsNumber);
}
}