Merge pull request #12 from rbkmoney/ft/safe-message-consuming

Safe message consuming
This commit is contained in:
Pospolita Nikita 2019-08-02 13:25:33 +03:00 committed by GitHub
commit 46b35ba9de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 10 deletions

28
pom.xml
View File

@ -13,7 +13,7 @@
<packaging>jar</packaging>
<artifactId>kafka-common-lib</artifactId>
<version>0.0.9</version>
<version>0.1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -49,9 +49,9 @@
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>${woody.thrift.version}</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
@ -60,18 +60,26 @@
</dependency>
<!-- rbk -->
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
<groupId>com.rbkmoney</groupId>
<artifactId>machinegun-proto</artifactId>
<version>1.12-ebae56f</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>${woody.thrift.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>damsel</artifactId>
<version>${damsel.version}</version>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,27 @@
package com.rbkmoney.kafka.common.exception.handler;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import static com.rbkmoney.kafka.common.util.LogUtil.toSummaryString;
@Slf4j
public class SeekToCurrentWithSleepBatchErrorHandler extends SeekToCurrentBatchErrorHandler {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.error(String.format("Records commit failed, size=%d, %s", data.count(),
toSummaryString((ConsumerRecords<String, MachineEvent>) data)), thrownException);
super.handle(thrownException, data, consumer, container);
}
}

View File

@ -1,8 +1,12 @@
package com.rbkmoney.kafka.common.util;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Instant;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.stream.Collectors;
public class LogUtil {
@ -26,4 +30,42 @@ public class LogUtil {
.collect(Collectors.joining(", "));
}
public static String toSummaryString(List<ConsumerRecord<String, MachineEvent>> records) {
if (records.isEmpty()) {
return "empty";
}
ConsumerRecord firstRecord = records.get(0);
ConsumerRecord lastRecord = records.get(records.size() - 1);
LongSummaryStatistics keySizeSummary = records.stream().mapToLong(ConsumerRecord::serializedKeySize).summaryStatistics();
LongSummaryStatistics valueSizeSummary = records.stream().mapToLong(ConsumerRecord::serializedValueSize).summaryStatistics();
return String.format("topic='%s', partition=%d, offset={%d...%d}, createdAt={%s...%s}, keySize={min=%d, max=%d, avg=%s}, valueSize={min=%d, max=%d, avg=%s}",
firstRecord.topic(), firstRecord.partition(),
firstRecord.offset(), lastRecord.offset(),
Instant.ofEpochMilli(firstRecord.timestamp()), Instant.ofEpochMilli(lastRecord.timestamp()),
keySizeSummary.getMin(), keySizeSummary.getMax(), keySizeSummary.getAverage(),
valueSizeSummary.getMin(), valueSizeSummary.getMax(), valueSizeSummary.getAverage()
);
}
public static String toSummaryString(ConsumerRecords<String, MachineEvent> records) {
if (records.isEmpty()) {
return "empty";
}
StringBuilder stringBuilder = new StringBuilder();
records
.partitions()
.forEach(partition -> stringBuilder.append(toSummaryString(records.records(partition))).append("\n"));
return stringBuilder.toString();
}
public static String toSummaryStringWithValues(List<ConsumerRecord<String, MachineEvent>> records) {
String valueKeysString = records.stream().map(ConsumerRecord::value)
.map(value -> String.format("'%s.%d'", value.getSourceId(), value.getEventId()))
.collect(Collectors.joining(", "));
return String.format("%s, values={%s}", toSummaryString(records), valueKeysString);
}
}