From d3670bd3f3961fbb5fa4db12db89879b889425ab Mon Sep 17 00:00:00 2001 From: Anatoly Karlov Date: Fri, 1 Apr 2022 17:27:51 +0700 Subject: [PATCH] migrate on java 17 and update kafka config (#34) --- .github/workflows/build.yml | 2 +- .github/workflows/deploy.yml | 2 +- pom.xml | 13 +- .../dev/vality/newway/config/KafkaConfig.java | 201 ++++++++---------- src/main/resources/application.yml | 3 - 5 files changed, 98 insertions(+), 123 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a654702..a4a5f62 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,4 +7,4 @@ on: jobs: build: - uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1 + uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v2-beta diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 6a394b0..5326c0d 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -12,7 +12,7 @@ env: jobs: deploy: - uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1 + uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v2-beta secrets: github-token: ${{ secrets.GITHUB_TOKEN }} mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }} diff --git a/pom.xml b/pom.xml index 5f52195..3dc4406 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ dev.vality service-parent-pom - 1.0.13 + 2.0.0-BETA-8 newway @@ -78,11 +78,6 @@ org.jooq jooq - - de.codecentric - spring-boot-admin-starter-client - 2.6.2 - net.logstash.logback logstash-logback-encoder @@ -132,7 +127,7 @@ dev.vality payout-manager-proto - 1.30-cc9875f + 1.35-dbed280 dev.vality @@ -150,7 +145,7 @@ dev.vality machinegun-proto - 1.27-dec518d + 1.31-b43d6fd dev.vality @@ -159,7 +154,7 @@ dev.vality fistful-proto - 1.141-fd4117f + 1.144-ea0fe7a dev.vality diff --git a/src/main/java/dev/vality/newway/config/KafkaConfig.java b/src/main/java/dev/vality/newway/config/KafkaConfig.java index 43a38c2..a26c73b 100644 --- a/src/main/java/dev/vality/newway/config/KafkaConfig.java +++ b/src/main/java/dev/vality/newway/config/KafkaConfig.java @@ -1,6 +1,6 @@ package dev.vality.newway.config; -import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler; +import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory; import dev.vality.machinegun.eventsink.MachineEvent; import dev.vality.newway.config.properties.KafkaConsumerProperties; import dev.vality.newway.config.properties.KafkaSslProperties; @@ -18,11 +18,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.BatchErrorHandler; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import java.io.File; @@ -32,7 +29,6 @@ import java.util.Map; @Configuration @RequiredArgsConstructor @EnableConfigurationProperties(KafkaSslProperties.class) -@SuppressWarnings("LineLength") public class KafkaConfig { private final KafkaConsumerProperties kafkaConsumerProperties; @@ -43,15 +39,92 @@ public class KafkaConfig { private String clientId; @Value("${kafka.bootstrap-servers}") private String bootstrapServers; - @Value("${retry-policy.maxAttempts}") - int maxAttempts; @Bean - public Map consumerConfigs(KafkaSslProperties kafkaSslProperties) { - return createConsumerConfig(kafkaSslProperties); + public ConsumerFactory consumerFactory(KafkaSslProperties kafkaSslProperties) { + return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties)); } - private Map createConsumerConfig(KafkaSslProperties kafkaSslProperties) { + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getInvoicingConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory recPayToolContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRecurrentPaymentToolConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory rateContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRateConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory depositContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDepositConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory identityContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getIdentityConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory walletContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWalletConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory withdrawalContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory payoutContainerFactory( + KafkaSslProperties kafkaSslProperties) { + var kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs(kafkaSslProperties)); + kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer()); + var factory = new ConcurrentKafkaListenerContainerFactory(); + initFactory(kafkaConsumerFactory, kafkaConsumerProperties.getPayoutConcurrency(), factory); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory sourceContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getSourceConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory destinationContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDestinationConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory withdrawalSessionContainerFactory( + ConsumerFactory consumerFactory) { + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalSessionConcurrency()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory partyManagementContainerFactory( + KafkaSslProperties kafkaSslProperties) { + Map configs = consumerConfigs(kafkaSslProperties); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup); + ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(configs); + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency()); + } + + private Map consumerConfigs(KafkaSslProperties kafkaSslProperties) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -82,112 +155,22 @@ public class KafkaConfig { } } - @Bean - public ConsumerFactory consumerFactory(KafkaSslProperties kafkaSslProperties) { - return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties)); - } - - @Bean - public KafkaListenerContainerFactory> kafkaListenerContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getInvoicingConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> recPayToolContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRecurrentPaymentToolConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> rateContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRateConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> depositContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDepositConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> identityContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getIdentityConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> walletContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWalletConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> withdrawalContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> payoutContainerFactory( - KafkaSslProperties kafkaSslProperties) { - DefaultKafkaConsumerFactory kafkaConsumerFactory = - new DefaultKafkaConsumerFactory<>(createConsumerConfig(kafkaSslProperties)); - kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer()); - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - initFactory(kafkaConsumerFactory, kafkaConsumerProperties.getPayoutConcurrency(), factory); - return factory; - } - - @Bean - public KafkaListenerContainerFactory> sourceContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getSourceConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> destinationContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDestinationConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> withdrawalSessionContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalSessionConcurrency()); - } - - @Bean - public KafkaListenerContainerFactory> partyManagementContainerFactory( - KafkaSslProperties kafkaSslProperties) { - Map configs = createConsumerConfig(kafkaSslProperties); - configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup); - ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(configs); - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency()); - } - - private KafkaListenerContainerFactory> createConcurrentFactory( - ConsumerFactory consumerFactory, int threadsNumber) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); + private ConcurrentKafkaListenerContainerFactory createConcurrentFactory( + ConsumerFactory consumerFactory, + int threadsNumber) { + var factory = new ConcurrentKafkaListenerContainerFactory(); initFactory(consumerFactory, threadsNumber, factory); return factory; } - private void initFactory(ConsumerFactory consumerFactory, - int threadsNumber, - ConcurrentKafkaListenerContainerFactory factory) { + private void initFactory( + ConsumerFactory consumerFactory, + int threadsNumber, + ConcurrentKafkaListenerContainerFactory factory) { factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); - factory.setBatchErrorHandler(kafkaErrorHandler()); + factory.setCommonErrorHandler(ExponentialBackOffDefaultErrorHandlerFactory.create()); factory.setConcurrency(threadsNumber); } - - public BatchErrorHandler kafkaErrorHandler() { - return new SeekToCurrentWithSleepBatchErrorHandler(); - } - } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f2c134c..6c6f1f8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -41,9 +41,6 @@ management: exposure: include: health,info,prometheus -retry-policy: - maxAttempts: -1 - kafka: bootstrap-servers: localhost:9092 client-id: newway