Add threads

This commit is contained in:
k.struzhkin 2020-08-24 10:25:22 +03:00
parent 3b13a1442a
commit b909709349
2 changed files with 4 additions and 1 deletions

View File

@ -5,7 +5,6 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -37,6 +36,8 @@ public class KafkaConfig {
private String clientStoreCertPath; private String clientStoreCertPath;
@Value("${kafka.ssl.enable}") @Value("${kafka.ssl.enable}")
private boolean kafkaSslEnable; private boolean kafkaSslEnable;
@Value("${kafka.num-stream-threads}")
private int numStreamThreads;
@Bean @Bean
public Properties mgEventStreamProperties() { public Properties mgEventStreamProperties() {
@ -48,6 +49,7 @@ public class KafkaConfig {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MachineEventSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MachineEventSerde.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
props.put(StreamsConfig.RETRIES_CONFIG, 5); props.put(StreamsConfig.RETRIES_CONFIG, 5);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 1000); props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);

View File

@ -28,6 +28,7 @@ service:
kafka: kafka:
bootstrap.servers: "localhost:29092" bootstrap.servers: "localhost:29092"
num-stream-threads: 7
ssl: ssl:
enable: false enable: false
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12 keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12