diff --git a/pom.xml b/pom.xml index 622b625..be0e1a7 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ jar kafka-common-lib - 0.0.2 + 0.0.3 Kafka common lib Kafka serializers/deserializers diff --git a/src/main/java/dev/vality/kafka/common/exception/handler/ExponentialBackOffDefaultErrorHandler.java b/src/main/java/dev/vality/kafka/common/exception/handler/ExponentialBackOffDefaultErrorHandler.java new file mode 100644 index 0000000..398c048 --- /dev/null +++ b/src/main/java/dev/vality/kafka/common/exception/handler/ExponentialBackOffDefaultErrorHandler.java @@ -0,0 +1,70 @@ +package dev.vality.kafka.common.exception.handler; + +import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory; +import dev.vality.kafka.common.util.LogUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.listener.*; +import org.springframework.util.backoff.ExponentialBackOff; + +import java.util.List; + +import static dev.vality.kafka.common.util.LogUtil.toSummaryString; + +/** + * Сейчас при создании {@link ConcurrentKafkaListenerContainerFactory} в качестве обработчика ошибок мы используем + * реализации интерфейса {@link BatchErrorHandler} — {@link SeekToCurrentErrorHandler} + * и {@link SeekToCurrentBatchErrorHandler}. В данной библиотеке эти хендлеры наследуют + * {@link SeekToCurrentWithSleepBatchErrorHandler} и {@link SeekToCurrentWithSleepErrorHandler}. В данный момент эти + * реализации являются устаревшими, а в spring boot 3 эти классы становятся package private, поэтому мы должны + * избавиться от их использования. + *

Вместо них официальная документация spring предлагает использовать {@link CommonErrorHandler} и его реализацию + * {@link DefaultErrorHandler}. + *

Данная библиотека предлагает класс {@link ExponentialBackOffDefaultErrorHandler}, который наследует + * {@link DefaultErrorHandler}. + *

Данный класс ограничивает {@link DefaultErrorHandler} использованием только экспоненциально замедляющегося + * таймаута для перечитывания оффсета, что в свою очередь поможет не взрывать эластик. Также добавляется отключение + * коммита оффсета после обработки хендлером {@link #isAckAfterHandle()}, а также дополнительное логгирование на + * уровне этого хендлера средствами утилит этой библиотеки. + *

Используйте {@link ExponentialBackOffDefaultErrorHandlerFactory} для создания инстанса этого класса + */ +@Slf4j +public class ExponentialBackOffDefaultErrorHandler extends DefaultErrorHandler { + + public ExponentialBackOffDefaultErrorHandler(ExponentialBackOff exponentialBackOff) { + super(exponentialBackOff); + } + + @Override + public boolean isAckAfterHandle() { + return false; + } + + @Override + public void handleRemaining( + Exception thrownException, + List> records, + Consumer consumer, + MessageListenerContainer container) { + log.error( + String.format("Records commit failed, size=%d, %s", records.size(), LogUtil.toString(records)), + thrownException); + super.handleRemaining(thrownException, records, consumer, container); + } + + @Override + public void handleBatch( + Exception thrownException, + ConsumerRecords data, + Consumer consumer, + MessageListenerContainer container, + Runnable invokeListener) { + log.error( + String.format("Records commit failed, size=%d, %s", data.count(), toSummaryString(data)), + thrownException); + super.handleBatch(thrownException, data, consumer, container, invokeListener); + } +} diff --git a/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepBatchErrorHandler.java b/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepBatchErrorHandler.java index df0ef6a..7059455 100644 --- a/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepBatchErrorHandler.java +++ b/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepBatchErrorHandler.java @@ -5,6 +5,8 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; @@ -12,7 +14,12 @@ import java.util.concurrent.TimeUnit; import static dev.vality.kafka.common.util.LogUtil.toSummaryString; +/** + * @deprecated in favor of {@link CommonErrorHandler} or {@link DefaultErrorHandler} + or {@link ExponentialBackOffDefaultErrorHandler}. + */ @Slf4j +@Deprecated public class SeekToCurrentWithSleepBatchErrorHandler extends SeekToCurrentBatchErrorHandler { @Setter diff --git a/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepErrorHandler.java b/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepErrorHandler.java index 1aabe19..0d6a9ab 100644 --- a/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepErrorHandler.java +++ b/src/main/java/dev/vality/kafka/common/exception/handler/SeekToCurrentWithSleepErrorHandler.java @@ -3,6 +3,8 @@ package dev.vality.kafka.common.exception.handler; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.util.backoff.FixedBackOff; @@ -12,7 +14,12 @@ import java.util.concurrent.TimeUnit; import static org.springframework.util.backoff.FixedBackOff.UNLIMITED_ATTEMPTS; +/** + * @deprecated in favor of {@link CommonErrorHandler} or {@link DefaultErrorHandler} + or {@link ExponentialBackOffDefaultErrorHandler}. + */ @Slf4j +@Deprecated public class SeekToCurrentWithSleepErrorHandler extends SeekToCurrentErrorHandler { private final Integer sleepTimeSeconds; diff --git a/src/main/java/dev/vality/kafka/common/util/ExponentialBackOffDefaultErrorHandlerFactory.java b/src/main/java/dev/vality/kafka/common/util/ExponentialBackOffDefaultErrorHandlerFactory.java new file mode 100644 index 0000000..dbc04ed --- /dev/null +++ b/src/main/java/dev/vality/kafka/common/util/ExponentialBackOffDefaultErrorHandlerFactory.java @@ -0,0 +1,41 @@ +package dev.vality.kafka.common.util; + +import dev.vality.kafka.common.exception.handler.ExponentialBackOffDefaultErrorHandler; +import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries; +import org.springframework.util.backoff.ExponentialBackOff; + +public class ExponentialBackOffDefaultErrorHandlerFactory { + + /** + * Создается инстанс хендлера с дефолтными настройками ExponentialBackOff + * + * @see ExponentialBackOff#DEFAULT_INITIAL_INTERVAL + * @see ExponentialBackOff#DEFAULT_MULTIPLIER + * @see ExponentialBackOff#DEFAULT_MAX_INTERVAL + * @see ExponentialBackOff#DEFAULT_MAX_ELAPSED_TIME + */ + public static ExponentialBackOffDefaultErrorHandler create() { + return new ExponentialBackOffDefaultErrorHandler(new ExponentialBackOff()); + } + + public static ExponentialBackOffDefaultErrorHandler create( + long initialInterval, + double multiplier, + long maxInterval) { + var exponentialBackOff = new ExponentialBackOff(initialInterval, multiplier); + exponentialBackOff.setMaxInterval(maxInterval); + return new ExponentialBackOffDefaultErrorHandler(exponentialBackOff); + } + + public static ExponentialBackOffDefaultErrorHandler create( + long initialInterval, + double multiplier, + long maxInterval, + int maxRetries) { + var exponentialBackOff = new ExponentialBackOffWithMaxRetries(maxRetries); + exponentialBackOff.setInitialInterval(initialInterval); + exponentialBackOff.setMultiplier(multiplier); + exponentialBackOff.setMaxInterval(maxInterval); + return new ExponentialBackOffDefaultErrorHandler(exponentialBackOff); + } +}