Fix quorum (#25)

* Fix quarum
This commit is contained in:
Kostya 2021-06-21 10:56:16 +03:00 committed by GitHub
parent 4e1ef72046
commit e48173b934
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 160 additions and 114 deletions

View File

@ -1,2 +1,3 @@
# wb-list-manager
wb-list-manager.

17
pom.xml
View File

@ -1,12 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE-2</version>
<artifactId>service-parent-pom</artifactId>
<version>1.2.5</version>
</parent>
<artifactId>wb-list-manager</artifactId>
@ -20,8 +20,9 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>11</java.version>
<server.port>8022</server.port>
<exposed.ports>${server.port}</exposed.ports>
<dockerfile.base.service.tag>bc95d0d6dc13c693acd2b274531a7d604b877bf3</dockerfile.base.service.tag>
<management.port>8023</management.port>
<exposed.ports>${server.port} ${management.port}</exposed.ports>
<dockerfile.base.service.tag>c0612d6052ac049496b72a23a04acb142035f249</dockerfile.base.service.tag>
<dockerfile.registry>dr2.rbkmoney.com</dockerfile.registry>
<shared.resources.version>0.3.7</shared.resources.version>
<wb.list.proto.version>1.33-554d59c</wb.list.proto.version>
@ -57,17 +58,15 @@
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>shared-resources</artifactId>
<version>${shared.resources.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>1.1.21</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>spring-boot-starter-metrics-statsd</artifactId>
<version>1.1.2</version>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
@ -162,7 +161,7 @@
</dependencies>
<configuration>
<resourceBundles>
<resourceBundle>com.rbkmoney:shared-resources:${shared.resources.version}</resourceBundle>
<resourceBundle>com.rbkmoney:shared-resources:${shared-resources.version}</resourceBundle>
</resourceBundles>
<attachToMain>false</attachToMain>
<attachToTest>false</attachToTest>

View File

@ -47,7 +47,8 @@ public class KafkaConfig {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommandSerde.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
props.putAll(sslConfigure());
return props;
}

View File

@ -3,7 +3,6 @@ package com.rbkmoney.wb.list.manager.config;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.netty.PingHealthCheck;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@ -17,7 +17,10 @@ public class RiakExecutionException extends RuntimeException {
super(cause);
}
public RiakExecutionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
public RiakExecutionException(String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -17,7 +17,10 @@ public class UnknownRowTypeException extends RuntimeException {
super(cause);
}
public UnknownRowTypeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
public UnknownRowTypeException(String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -86,19 +86,26 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
private Optional<com.rbkmoney.wb.list.manager.model.Row> getCascadeRow(Row row) {
if (row.isSetId() && row.getId().isSetPaymentId()) {
PaymentId paymentId = row.getId().getPaymentId();
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), paymentId.getShopId());
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(),
paymentId.getShopId());
} else if (row.isSetId() && row.getId().isSetP2pId()) {
P2pId p2pId = row.getId().getP2pId();
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, p2pId.getIdentityId());
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P,
p2pId.getIdentityId());
}
return cascadeGetRow(row.list_type, row.list_name, row.value, row.getPartyId(), row.getShopId());
}
private Optional<com.rbkmoney.wb.list.manager.model.Row> cascadeGetRow(ListType list_type, String list_name, String value, String partyId, String shopId) {
private Optional<com.rbkmoney.wb.list.manager.model.Row> cascadeGetRow(ListType listType,
String listName,
String value,
String partyId,
String shopId) {
return Optional.ofNullable(
listRepository.get(KeyGenerator.generateKey(list_type, list_name, value))
.orElse(listRepository.get(KeyGenerator.generateKey(list_type, list_name, value, partyId))
.orElse(listRepository.get(KeyGenerator.generateKey(list_type, list_name, value, partyId, shopId))
listRepository.get(KeyGenerator.generateKey(listType, listName, value))
.orElse(listRepository.get(KeyGenerator.generateKey(listType, listName, value, partyId))
.orElse(listRepository
.get(KeyGenerator.generateKey(listType, listName, value, partyId, shopId))
.orElse(null))));
}

View File

@ -1,7 +1,6 @@
package com.rbkmoney.wb.list.manager.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

View File

@ -76,12 +76,13 @@ public class ListRepository implements CrudRepository<Row, String> {
log.info("ListRepository get bucket: {} key: {}", bucket, key);
Location quoteObjectLocation = createLocation(bucket, key);
FetchValue fetch = new FetchValue.Builder(quoteObjectLocation)
.withOption(FetchValue.Option.R, new Quorum(3))
.withOption(FetchValue.Option.R, Quorum.quorumQuorum())
.build();
FetchValue.Response response = client.execute(fetch);
RiakObject obj = response.getValue(RiakObject.class);
return obj != null && obj.getValue() != null ?
Optional.of(new Row(key, obj.getValue().toString())) : Optional.empty();
return obj != null && obj.getValue() != null
? Optional.of(new Row(key, obj.getValue().toString()))
: Optional.empty();
} catch (InterruptedException e) {
log.error("InterruptedException in ListRepository when get e: ", e);
Thread.currentThread().interrupt();

View File

@ -6,15 +6,15 @@ import lombok.RequiredArgsConstructor;
import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import java.io.IOException;
@WebServlet("/wb_list/v1")
@RequiredArgsConstructor
public class FraudInspectorServlet extends GenericServlet {
private Servlet thriftServlet;
private final WbListServiceSrv.Iface fraudInspectorHandler;
private Servlet thriftServlet;
@Override
public void init(ServletConfig config) throws ServletException {

View File

@ -21,16 +21,17 @@ import java.util.Properties;
@RequiredArgsConstructor
public class WbListStreamFactory {
@Value("${kafka.wblist.topic.command}")
private String readTopic;
@Value("${kafka.wblist.topic.event.sink}")
private String resultTopic;
private final CommandSerde commandSerde = new CommandSerde();
private final EventSerde eventSerde = new EventSerde();
private final CommandService commandService;
private final RetryTemplate retryTemplate;
@Value("${kafka.wblist.topic.command}")
private String readTopic;
@Value("${kafka.wblist.topic.event.sink}")
private String resultTopic;
public KafkaStreams create(final Properties streamsConfiguration) {
try {
StreamsBuilder builder = new StreamsBuilder();

View File

@ -16,10 +16,12 @@ public class KeyGenerator {
public static String generateKey(com.rbkmoney.damsel.wb_list.Row row) {
if (row.isSetId() && row.getId().isSetPaymentId()) {
PaymentId paymentId = row.getId().getPaymentId();
return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), paymentId.getShopId());
return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(),
paymentId.getShopId());
} else if (row.isSetId() && row.getId().isSetP2pId()) {
P2pId p2pId = row.getId().getP2pId();
return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, p2pId.getIdentityId());
return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P,
p2pId.getIdentityId());
}
return generateKey(row.getListType(), row.getListName(), row.getValue(), row.getPartyId(), row.getShopId());
}

View File

@ -1,20 +1,38 @@
server:
port: @server.port@
port: '@server.port@'
rest:
port: '@server.rest.port@'
endpoint: ch-manager
management:
security:
flag: false
server:
port: '@management.port@'
metrics:
export:
statsd:
flavor: etsy
enabled: false
prometheus:
enabled: false
endpoint:
health:
show-details: always
metrics:
enabled: true
prometheus:
enabled: true
endpoints:
web:
exposure:
include: health,info,prometheus
---
spring:
application:
name: @name@
name: '@project.name@'
output:
ansi:
enabled: always
riak:
address: localhost
port: 8087

View File

@ -9,8 +9,6 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import java.time.Duration;

View File

@ -43,6 +43,25 @@ public abstract class KafkaAbstractTest extends AbstractRiakIntegrationTest {
.withEmbeddedZookeeper()
.withStartupTimeout(Duration.ofMinutes(2));
public static <T> Consumer<String, T> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
public static <T> Producer<String, T> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
return new KafkaProducer<>(props);
}
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
@ -67,23 +86,4 @@ public abstract class KafkaAbstractTest extends AbstractRiakIntegrationTest {
}
}
public static <T> Consumer<String, T> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
public static <T> Producer<String, T> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
return new KafkaProducer<>(props);
}
}

View File

@ -18,6 +18,8 @@ import org.springframework.test.context.ContextConfiguration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
@SpringBootTest
@ContextConfiguration(classes = {ListRepository.class, RiakConfig.class})
public class RiakTest extends KafkaAbstractTest {
@ -26,7 +28,7 @@ public class RiakTest extends KafkaAbstractTest {
private static final String KEY = "key";
@Value("${riak.bucket}")
private String BUCKET_NAME;
private String bucketName;
@Autowired
private ListRepository listRepository;
@ -36,12 +38,14 @@ public class RiakTest extends KafkaAbstractTest {
@Test
public void riakTest() throws ExecutionException, InterruptedException {
sleep(10000);
Row row = new Row();
row.setKey(KEY);
row.setValue(VALUE);
listRepository.create(row);
Namespace ns = new Namespace(BUCKET_NAME);
Namespace ns = new Namespace(bucketName);
Location location = new Location(ns, KEY);
FetchValue fv = new FetchValue.Builder(location).build();
FetchValue.Response response = client.execute(fv);

View File

@ -39,27 +39,37 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
@ContextConfiguration(classes = WbListManagerApplication.class)
public class WbListManagerApplicationTest extends KafkaAbstractTest {
public static final String IDENTITY_ID = "identityId";
private static final String VALUE = "value";
private static final String KEY = "key";
private static final String SHOP_ID = "shopId";
private static final String PARTY_ID = "partyId";
private static final String LIST_NAME = "listName";
public static final String IDENTITY_ID = "identityId";
@LocalServerPort
int serverPort;
private static String SERVICE_URL = "http://localhost:%s/wb_list/v1";
@Value("${kafka.wblist.topic.command}")
public String topic;
@Value("${riak.bucket}")
private String BUCKET_NAME;
@Value("${kafka.wblist.topic.event.sink}")
public String topicEventSink;
@LocalServerPort
int serverPort;
@Value("${riak.bucket}")
private String bucketName;
public static Consumer<String, Event> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "CLIENT");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new KafkaConsumer<>(props);
}
@Test
public void kafkaRowTest() throws Exception {
THClientBuilder clientBuilder = new THClientBuilder()
@ -85,7 +95,8 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
consumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(record -> {
log.info("poll message: {}", record.value());
eventList.add(record.value());});
eventList.add(record.value());
});
consumer.close();
assertEquals(2, eventList.size());
@ -95,7 +106,8 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
changeCommand = createCommand(row);
row.setShopId(null);
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
ProducerRecord<String, ChangeCommand> producerRecord =
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
@ -146,32 +158,38 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
assertFalse(exist);
}
private void produceDeleteRow(ChangeCommand changeCommand) throws InterruptedException, java.util.concurrent.ExecutionException {
private void produceDeleteRow(ChangeCommand changeCommand)
throws InterruptedException, java.util.concurrent.ExecutionException {
Producer<String, ChangeCommand> producer = createProducer();
changeCommand.setCommand(Command.DELETE);
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
ProducerRecord<String, ChangeCommand> producerRecord =
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
}
private ChangeCommand produceCreateRow(com.rbkmoney.damsel.wb_list.Row row) throws InterruptedException, java.util.concurrent.ExecutionException {
private ChangeCommand produceCreateRow(com.rbkmoney.damsel.wb_list.Row row)
throws InterruptedException, java.util.concurrent.ExecutionException {
Producer<String, ChangeCommand> producer = createProducer();
ChangeCommand changeCommand = createCommand(row);
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
ProducerRecord<String, ChangeCommand> producerRecord =
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
return changeCommand;
}
private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount, String partyId, String shopId) throws InterruptedException, java.util.concurrent.ExecutionException, TException {
private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount, String partyId,
String shopId)
throws InterruptedException, java.util.concurrent.ExecutionException, TException {
Row rowWithCountInfo = createRow(startTimeCount, partyId, shopId);
return iface.getRowInfo(rowWithCountInfo).getRowInfo();
}
private Row createRow(String startTimeCount, String partyId, String shopId) throws InterruptedException, java.util.concurrent.ExecutionException {
private Row createRow(String startTimeCount, String partyId, String shopId)
throws InterruptedException, java.util.concurrent.ExecutionException {
Producer<String, ChangeCommand> producer;
ChangeCommand changeCommand;
ProducerRecord<String, ChangeCommand> producerRecord;
@ -186,14 +204,6 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
return rowWithCountInfo;
}
@NotNull
private ChangeCommandWrapper createCommand(com.rbkmoney.damsel.wb_list.Row row) {
ChangeCommandWrapper changeCommand = new ChangeCommandWrapper();
changeCommand.setCommand(Command.CREATE);
changeCommand.setRow(row);
return changeCommand;
}
@NotNull
private com.rbkmoney.damsel.wb_list.Row createRow() {
Row row = createListRow();
@ -204,6 +214,15 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
return row;
}
@NotNull
private ChangeCommandWrapper createCommand(com.rbkmoney.damsel.wb_list.Row row) {
ChangeCommandWrapper changeCommand = new ChangeCommandWrapper();
changeCommand.setCommand(Command.CREATE);
changeCommand.setRow(row);
return changeCommand;
}
@NotNull
private com.rbkmoney.damsel.wb_list.Row createRowOld() {
Row row = createListRow()
@ -222,7 +241,8 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
}
@NotNull
private com.rbkmoney.damsel.wb_list.Row createRowWithCountInfo(String startTimeCount, String partyId, String shopId) {
private com.rbkmoney.damsel.wb_list.Row createRowWithCountInfo(String startTimeCount, String partyId,
String shopId) {
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(SHOP_ID)
@ -240,15 +260,4 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
return row;
}
public static Consumer<String, Event> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "CLIENT");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new KafkaConsumer<>(props);
}
}

View File

@ -46,18 +46,18 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest {
private static final String PARTY_ID = "partyId";
private static final String LIST_NAME = "listName";
@MockBean
private ListRepository listRepository;
@Value("${kafka.wblist.topic.command}")
public String topic;
@Value("${riak.bucket}")
private String BUCKET_NAME;
@Value("${kafka.wblist.topic.event.sink}")
public String topicEventSink;
@MockBean
private ListRepository listRepository;
@Value("${riak.bucket}")
private String bucketName;
public static <T> Producer<String, T> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
@ -67,6 +67,16 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest {
return new KafkaProducer<>(props);
}
public static <T> Consumer<String, T> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
@Test
public void kafkaRowTestException() throws Exception {
doThrow(new RiakExecutionException(),
@ -77,7 +87,8 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest {
Producer<String, ChangeCommand> producerNew = createProducer();
ChangeCommand changeCommand = createCommand();
changeCommand.setCommand(Command.CREATE);
ProducerRecord<String, ChangeCommand> producerRecordCommand = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
ProducerRecord<String, ChangeCommand> producerRecordCommand =
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producerNew.send(producerRecordCommand).get();
producerNew.close();
@ -108,14 +119,4 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest {
return row;
}
public static <T> Consumer<String, T> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
}