migrate on java 17 and update kafka config (#34)

This commit is contained in:
Anatoly Karlov 2022-04-01 17:27:51 +07:00 committed by GitHub
parent 7051360868
commit d3670bd3f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 123 deletions

View File

@ -7,4 +7,4 @@ on:
jobs: jobs:
build: build:
uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1 uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v2-beta

View File

@ -12,7 +12,7 @@ env:
jobs: jobs:
deploy: deploy:
uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1 uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v2-beta
secrets: secrets:
github-token: ${{ secrets.GITHUB_TOKEN }} github-token: ${{ secrets.GITHUB_TOKEN }}
mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }} mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }}

13
pom.xml
View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId> <artifactId>service-parent-pom</artifactId>
<version>1.0.13</version> <version>2.0.0-BETA-8</version>
</parent> </parent>
<artifactId>newway</artifactId> <artifactId>newway</artifactId>
@ -78,11 +78,6 @@
<groupId>org.jooq</groupId> <groupId>org.jooq</groupId>
<artifactId>jooq</artifactId> <artifactId>jooq</artifactId>
</dependency> </dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.6.2</version>
</dependency>
<dependency> <dependency>
<groupId>net.logstash.logback</groupId> <groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId> <artifactId>logstash-logback-encoder</artifactId>
@ -132,7 +127,7 @@
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
<artifactId>payout-manager-proto</artifactId> <artifactId>payout-manager-proto</artifactId>
<version>1.30-cc9875f</version> <version>1.35-dbed280</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
@ -150,7 +145,7 @@
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
<artifactId>machinegun-proto</artifactId> <artifactId>machinegun-proto</artifactId>
<version>1.27-dec518d</version> <version>1.31-b43d6fd</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
@ -159,7 +154,7 @@
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
<artifactId>fistful-proto</artifactId> <artifactId>fistful-proto</artifactId>
<version>1.141-fd4117f</version> <version>1.144-ea0fe7a</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>

View File

@ -1,6 +1,6 @@
package dev.vality.newway.config; package dev.vality.newway.config;
import dev.vality.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler; import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent; import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.config.properties.KafkaConsumerProperties; import dev.vality.newway.config.properties.KafkaConsumerProperties;
import dev.vality.newway.config.properties.KafkaSslProperties; import dev.vality.newway.config.properties.KafkaSslProperties;
@ -18,11 +18,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import java.io.File; import java.io.File;
@ -32,7 +29,6 @@ import java.util.Map;
@Configuration @Configuration
@RequiredArgsConstructor @RequiredArgsConstructor
@EnableConfigurationProperties(KafkaSslProperties.class) @EnableConfigurationProperties(KafkaSslProperties.class)
@SuppressWarnings("LineLength")
public class KafkaConfig { public class KafkaConfig {
private final KafkaConsumerProperties kafkaConsumerProperties; private final KafkaConsumerProperties kafkaConsumerProperties;
@ -43,15 +39,92 @@ public class KafkaConfig {
private String clientId; private String clientId;
@Value("${kafka.bootstrap-servers}") @Value("${kafka.bootstrap-servers}")
private String bootstrapServers; private String bootstrapServers;
@Value("${retry-policy.maxAttempts}")
int maxAttempts;
@Bean @Bean
public Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) { public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) {
return createConsumerConfig(kafkaSslProperties); return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties));
} }
private Map<String, Object> createConsumerConfig(KafkaSslProperties kafkaSslProperties) { @Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getInvoicingConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> recPayToolContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRecurrentPaymentToolConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> rateContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRateConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> depositContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDepositConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> identityContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getIdentityConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> walletContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWalletConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> withdrawalContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> payoutContainerFactory(
KafkaSslProperties kafkaSslProperties) {
var kafkaConsumerFactory = new DefaultKafkaConsumerFactory<String, Event>(consumerConfigs(kafkaSslProperties));
kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer());
var factory = new ConcurrentKafkaListenerContainerFactory<String, Event>();
initFactory(kafkaConsumerFactory, kafkaConsumerProperties.getPayoutConcurrency(), factory);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> sourceContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getSourceConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> destinationContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDestinationConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> withdrawalSessionContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalSessionConcurrency());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MachineEvent> partyManagementContainerFactory(
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configs = consumerConfigs(kafkaSslProperties);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency());
}
private Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) {
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@ -82,112 +155,22 @@ public class KafkaConfig {
} }
} }
@Bean private ConcurrentKafkaListenerContainerFactory<String, MachineEvent> createConcurrentFactory(
public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) { ConsumerFactory<String, MachineEvent> consumerFactory,
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties)); int threadsNumber) {
} var factory = new ConcurrentKafkaListenerContainerFactory<String, MachineEvent>();
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> kafkaListenerContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getInvoicingConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> recPayToolContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRecurrentPaymentToolConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> rateContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getRateConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> depositContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDepositConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> identityContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getIdentityConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> walletContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWalletConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> withdrawalContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> payoutContainerFactory(
KafkaSslProperties kafkaSslProperties) {
DefaultKafkaConsumerFactory<String, Event> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(createConsumerConfig(kafkaSslProperties));
kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer());
ConcurrentKafkaListenerContainerFactory<String, Event> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(kafkaConsumerFactory, kafkaConsumerProperties.getPayoutConcurrency(), factory);
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> sourceContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getSourceConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> destinationContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDestinationConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> withdrawalSessionContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalSessionConcurrency());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory(
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configs = createConsumerConfig(kafkaSslProperties);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency());
}
private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> createConcurrentFactory(
ConsumerFactory<String, MachineEvent> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(consumerFactory, threadsNumber, factory); initFactory(consumerFactory, threadsNumber, factory);
return factory; return factory;
} }
private <T> void initFactory(ConsumerFactory<String, T> consumerFactory, private <T> void initFactory(
int threadsNumber, ConsumerFactory<String, T> consumerFactory,
ConcurrentKafkaListenerContainerFactory<String, T> factory) { int threadsNumber,
ConcurrentKafkaListenerContainerFactory<String, T> factory) {
factory.setConsumerFactory(consumerFactory); factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setBatchErrorHandler(kafkaErrorHandler()); factory.setCommonErrorHandler(ExponentialBackOffDefaultErrorHandlerFactory.create());
factory.setConcurrency(threadsNumber); factory.setConcurrency(threadsNumber);
} }
public BatchErrorHandler kafkaErrorHandler() {
return new SeekToCurrentWithSleepBatchErrorHandler();
}
} }

View File

@ -41,9 +41,6 @@ management:
exposure: exposure:
include: health,info,prometheus include: health,info,prometheus
retry-policy:
maxAttempts: -1
kafka: kafka:
bootstrap-servers: localhost:9092 bootstrap-servers: localhost:9092
client-id: newway client-id: newway