add actual ErrorHandler (#14)

This commit is contained in:
Anatoly Karlov 2022-04-01 13:15:55 +07:00 committed by GitHub
parent a22f032da1
commit 72a2259902
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 126 additions and 1 deletions

View File

@ -13,7 +13,7 @@
<packaging>jar</packaging>
<artifactId>kafka-common-lib</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
<name>Kafka common lib</name>
<description>Kafka serializers/deserializers</description>

View File

@ -0,0 +1,70 @@
package dev.vality.kafka.common.exception.handler;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.kafka.common.util.LogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.ExponentialBackOff;
import java.util.List;
import static dev.vality.kafka.common.util.LogUtil.toSummaryString;
/**
* Сейчас при создании {@link ConcurrentKafkaListenerContainerFactory} в качестве обработчика ошибок мы используем
* реализации интерфейса {@link BatchErrorHandler} {@link SeekToCurrentErrorHandler}
* и {@link SeekToCurrentBatchErrorHandler}. В данной библиотеке эти хендлеры наследуют
* {@link SeekToCurrentWithSleepBatchErrorHandler} и {@link SeekToCurrentWithSleepErrorHandler}. В данный момент эти
* реализации являются устаревшими, а в spring boot 3 эти классы становятся package private, поэтому мы должны
* избавиться от их использования.
* <p>Вместо них официальная документация spring предлагает использовать {@link CommonErrorHandler} и его реализацию
* {@link DefaultErrorHandler}.
* <p>Данная библиотека предлагает класс {@link ExponentialBackOffDefaultErrorHandler}, который наследует
* {@link DefaultErrorHandler}.
* <p>Данный класс ограничивает {@link DefaultErrorHandler} использованием только экспоненциально замедляющегося
* таймаута для перечитывания оффсета, что в свою очередь поможет не взрывать эластик. Также добавляется отключение
* коммита оффсета после обработки хендлером {@link #isAckAfterHandle()}, а также дополнительное логгирование на
* уровне этого хендлера средствами утилит этой библиотеки.
* <p>Используйте {@link ExponentialBackOffDefaultErrorHandlerFactory} для создания инстанса этого класса
*/
@Slf4j
public class ExponentialBackOffDefaultErrorHandler extends DefaultErrorHandler {
public ExponentialBackOffDefaultErrorHandler(ExponentialBackOff exponentialBackOff) {
super(exponentialBackOff);
}
@Override
public boolean isAckAfterHandle() {
return false;
}
@Override
public void handleRemaining(
Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.error(
String.format("Records commit failed, size=%d, %s", records.size(), LogUtil.toString(records)),
thrownException);
super.handleRemaining(thrownException, records, consumer, container);
}
@Override
public void handleBatch(
Exception thrownException,
ConsumerRecords<?, ?> data,
Consumer<?, ?> consumer,
MessageListenerContainer container,
Runnable invokeListener) {
log.error(
String.format("Records commit failed, size=%d, %s", data.count(), toSummaryString(data)),
thrownException);
super.handleBatch(thrownException, data, consumer, container, invokeListener);
}
}

View File

@ -5,6 +5,8 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
@ -12,7 +14,12 @@ import java.util.concurrent.TimeUnit;
import static dev.vality.kafka.common.util.LogUtil.toSummaryString;
/**
* @deprecated in favor of {@link CommonErrorHandler} or {@link DefaultErrorHandler}
or {@link ExponentialBackOffDefaultErrorHandler}.
*/
@Slf4j
@Deprecated
public class SeekToCurrentWithSleepBatchErrorHandler extends SeekToCurrentBatchErrorHandler {
@Setter

View File

@ -3,6 +3,8 @@ package dev.vality.kafka.common.exception.handler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@ -12,7 +14,12 @@ import java.util.concurrent.TimeUnit;
import static org.springframework.util.backoff.FixedBackOff.UNLIMITED_ATTEMPTS;
/**
* @deprecated in favor of {@link CommonErrorHandler} or {@link DefaultErrorHandler}
or {@link ExponentialBackOffDefaultErrorHandler}.
*/
@Slf4j
@Deprecated
public class SeekToCurrentWithSleepErrorHandler extends SeekToCurrentErrorHandler {
private final Integer sleepTimeSeconds;

View File

@ -0,0 +1,41 @@
package dev.vality.kafka.common.util;
import dev.vality.kafka.common.exception.handler.ExponentialBackOffDefaultErrorHandler;
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
import org.springframework.util.backoff.ExponentialBackOff;
public class ExponentialBackOffDefaultErrorHandlerFactory {
/**
* Создается инстанс хендлера с дефолтными настройками ExponentialBackOff
*
* @see ExponentialBackOff#DEFAULT_INITIAL_INTERVAL
* @see ExponentialBackOff#DEFAULT_MULTIPLIER
* @see ExponentialBackOff#DEFAULT_MAX_INTERVAL
* @see ExponentialBackOff#DEFAULT_MAX_ELAPSED_TIME
*/
public static ExponentialBackOffDefaultErrorHandler create() {
return new ExponentialBackOffDefaultErrorHandler(new ExponentialBackOff());
}
public static ExponentialBackOffDefaultErrorHandler create(
long initialInterval,
double multiplier,
long maxInterval) {
var exponentialBackOff = new ExponentialBackOff(initialInterval, multiplier);
exponentialBackOff.setMaxInterval(maxInterval);
return new ExponentialBackOffDefaultErrorHandler(exponentialBackOff);
}
public static ExponentialBackOffDefaultErrorHandler create(
long initialInterval,
double multiplier,
long maxInterval,
int maxRetries) {
var exponentialBackOff = new ExponentialBackOffWithMaxRetries(maxRetries);
exponentialBackOff.setInitialInterval(initialInterval);
exponentialBackOff.setMultiplier(multiplier);
exponentialBackOff.setMaxInterval(maxInterval);
return new ExponentialBackOffDefaultErrorHandler(exponentialBackOff);
}
}