Fix stream (#26)

This commit is contained in:
struga 2023-01-26 18:01:23 +07:00 committed by GitHub
parent 9c8ac08b05
commit e9b6d5b7be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -35,10 +35,12 @@ public class StartupListener implements ApplicationListener<ContextRefreshedEven
log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.metadataForAllStreamsClients()); log.info("StartupListener start stream kafkaStreams: {}", kafkaStreams.metadataForAllStreamsClients());
if (wbListCorrectionStreamProperties.getEnabled()) { if (wbListCorrectionStreamProperties.getEnabled()) {
wbListStreamProperties.put("application.id", wbListCorrectionStreamProperties.getApplicationId()); Properties properties = new Properties();
wbListStreamProperties.put("client.id", wbListCorrectionStreamProperties.getClientId()); properties.putAll(wbListStreamProperties);
properties.put("application.id", wbListCorrectionStreamProperties.getApplicationId());
properties.put("client.id", wbListCorrectionStreamProperties.getClientId());
kafkaStreamsWbListErrorRowsCorrection = kafkaStreamsWbListErrorRowsCorrection =
wbListErrorRowsCorrectionStreamFactory.create(wbListStreamProperties); wbListErrorRowsCorrectionStreamFactory.create(properties);
kafkaStreamsWbListErrorRowsCorrection.start(); kafkaStreamsWbListErrorRowsCorrection.start();
log.info("StartupListener start stream kafkaStreamsWbListErrorRowsCorrection: {}", log.info("StartupListener start stream kafkaStreamsWbListErrorRowsCorrection: {}",
kafkaStreamsWbListErrorRowsCorrection.metadataForAllStreamsClients()); kafkaStreamsWbListErrorRowsCorrection.metadataForAllStreamsClients());