From e48173b934e7020bfa95799ca655ff598a48e820 Mon Sep 17 00:00:00 2001 From: Kostya Date: Mon, 21 Jun 2021 10:56:16 +0300 Subject: [PATCH] Fix quorum (#25) * Fix quarum --- Jenkinsfile | 2 +- README.md | 1 + pom.xml | 17 ++-- .../wb/list/manager/config/KafkaConfig.java | 3 +- .../wb/list/manager/config/RiakConfig.java | 1 - .../exception/RiakExecutionException.java | 5 +- .../exception/UnknownRowTypeException.java | 5 +- .../manager/handler/WbListServiceHandler.java | 19 +++-- .../wb/list/manager/model/CountInfoModel.java | 1 - .../manager/repository/ListRepository.java | 7 +- .../resource/FraudInspectorServlet.java | 4 +- .../manager/stream/WbListStreamFactory.java | 11 +-- .../wb/list/manager/utils/KeyGenerator.java | 6 +- src/main/resources/application.yml | 26 +++++- .../manager/AbstractRiakIntegrationTest.java | 2 - .../wb/list/manager/KafkaAbstractTest.java | 38 ++++----- .../rbkmoney/wb/list/manager/RiakTest.java | 8 +- .../manager/WbListManagerApplicationTest.java | 83 ++++++++++--------- .../manager/WbListSafetyApplicationTest.java | 35 ++++---- 19 files changed, 160 insertions(+), 114 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index fff612e..af88a0f 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -13,4 +13,4 @@ build('wb-list-manager', 'java-maven') { def useJava11 = true javaServicePipeline(serviceName, useJava11, mvnArgs) -} \ No newline at end of file +} diff --git a/README.md b/README.md index bb29bd1..8b0bb43 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # wb-list-manager + wb-list-manager. diff --git a/pom.xml b/pom.xml index db26336..35906c8 100644 --- a/pom.xml +++ b/pom.xml @@ -1,12 +1,12 @@ - 4.0.0 com.rbkmoney - spring-boot-starter-parent - 2.3.0.RELEASE-2 + service-parent-pom + 1.2.5 wb-list-manager @@ -20,8 +20,9 @@ UTF-8 11 8022 - ${server.port} - bc95d0d6dc13c693acd2b274531a7d604b877bf3 + 8023 + ${server.port} ${management.port} + c0612d6052ac049496b72a23a04acb142035f249 dr2.rbkmoney.com 0.3.7 1.33-554d59c @@ -57,17 +58,15 @@ com.rbkmoney shared-resources - ${shared.resources.version} com.rbkmoney.woody woody-thrift - 1.1.21 com.rbkmoney spring-boot-starter-metrics-statsd - 1.1.2 + 1.1.0 com.rbkmoney @@ -162,7 +161,7 @@ - com.rbkmoney:shared-resources:${shared.resources.version} + com.rbkmoney:shared-resources:${shared-resources.version} false false diff --git a/src/main/java/com/rbkmoney/wb/list/manager/config/KafkaConfig.java b/src/main/java/com/rbkmoney/wb/list/manager/config/KafkaConfig.java index 83943da..6848540 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/config/KafkaConfig.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/config/KafkaConfig.java @@ -47,7 +47,8 @@ public class KafkaConfig { props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommandSerde.class); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + LogAndContinueExceptionHandler.class); props.putAll(sslConfigure()); return props; } diff --git a/src/main/java/com/rbkmoney/wb/list/manager/config/RiakConfig.java b/src/main/java/com/rbkmoney/wb/list/manager/config/RiakConfig.java index afd4577..b1b55ff 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/config/RiakConfig.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/config/RiakConfig.java @@ -3,7 +3,6 @@ package com.rbkmoney.wb.list.manager.config; import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakNode; -import com.basho.riak.client.core.netty.PingHealthCheck; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/src/main/java/com/rbkmoney/wb/list/manager/exception/RiakExecutionException.java b/src/main/java/com/rbkmoney/wb/list/manager/exception/RiakExecutionException.java index 1652cde..8645972 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/exception/RiakExecutionException.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/exception/RiakExecutionException.java @@ -17,7 +17,10 @@ public class RiakExecutionException extends RuntimeException { super(cause); } - public RiakExecutionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public RiakExecutionException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/src/main/java/com/rbkmoney/wb/list/manager/exception/UnknownRowTypeException.java b/src/main/java/com/rbkmoney/wb/list/manager/exception/UnknownRowTypeException.java index f5a8ed4..2605d2f 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/exception/UnknownRowTypeException.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/exception/UnknownRowTypeException.java @@ -17,7 +17,10 @@ public class UnknownRowTypeException extends RuntimeException { super(cause); } - public UnknownRowTypeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public UnknownRowTypeException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java b/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java index 0bbe772..9849e26 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java @@ -86,19 +86,26 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface { private Optional getCascadeRow(Row row) { if (row.isSetId() && row.getId().isSetPaymentId()) { PaymentId paymentId = row.getId().getPaymentId(); - return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), paymentId.getShopId()); + return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), + paymentId.getShopId()); } else if (row.isSetId() && row.getId().isSetP2pId()) { P2pId p2pId = row.getId().getP2pId(); - return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, p2pId.getIdentityId()); + return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, + p2pId.getIdentityId()); } return cascadeGetRow(row.list_type, row.list_name, row.value, row.getPartyId(), row.getShopId()); } - private Optional cascadeGetRow(ListType list_type, String list_name, String value, String partyId, String shopId) { + private Optional cascadeGetRow(ListType listType, + String listName, + String value, + String partyId, + String shopId) { return Optional.ofNullable( - listRepository.get(KeyGenerator.generateKey(list_type, list_name, value)) - .orElse(listRepository.get(KeyGenerator.generateKey(list_type, list_name, value, partyId)) - .orElse(listRepository.get(KeyGenerator.generateKey(list_type, list_name, value, partyId, shopId)) + listRepository.get(KeyGenerator.generateKey(listType, listName, value)) + .orElse(listRepository.get(KeyGenerator.generateKey(listType, listName, value, partyId)) + .orElse(listRepository + .get(KeyGenerator.generateKey(listType, listName, value, partyId, shopId)) .orElse(null)))); } diff --git a/src/main/java/com/rbkmoney/wb/list/manager/model/CountInfoModel.java b/src/main/java/com/rbkmoney/wb/list/manager/model/CountInfoModel.java index 0d4bf21..847e4cf 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/model/CountInfoModel.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/model/CountInfoModel.java @@ -1,7 +1,6 @@ package com.rbkmoney.wb.list.manager.model; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; diff --git a/src/main/java/com/rbkmoney/wb/list/manager/repository/ListRepository.java b/src/main/java/com/rbkmoney/wb/list/manager/repository/ListRepository.java index 843f31e..be808ee 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/repository/ListRepository.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/repository/ListRepository.java @@ -76,12 +76,13 @@ public class ListRepository implements CrudRepository { log.info("ListRepository get bucket: {} key: {}", bucket, key); Location quoteObjectLocation = createLocation(bucket, key); FetchValue fetch = new FetchValue.Builder(quoteObjectLocation) - .withOption(FetchValue.Option.R, new Quorum(3)) + .withOption(FetchValue.Option.R, Quorum.quorumQuorum()) .build(); FetchValue.Response response = client.execute(fetch); RiakObject obj = response.getValue(RiakObject.class); - return obj != null && obj.getValue() != null ? - Optional.of(new Row(key, obj.getValue().toString())) : Optional.empty(); + return obj != null && obj.getValue() != null + ? Optional.of(new Row(key, obj.getValue().toString())) + : Optional.empty(); } catch (InterruptedException e) { log.error("InterruptedException in ListRepository when get e: ", e); Thread.currentThread().interrupt(); diff --git a/src/main/java/com/rbkmoney/wb/list/manager/resource/FraudInspectorServlet.java b/src/main/java/com/rbkmoney/wb/list/manager/resource/FraudInspectorServlet.java index c39c2ae..05f177a 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/resource/FraudInspectorServlet.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/resource/FraudInspectorServlet.java @@ -6,15 +6,15 @@ import lombok.RequiredArgsConstructor; import javax.servlet.*; import javax.servlet.annotation.WebServlet; + import java.io.IOException; @WebServlet("/wb_list/v1") @RequiredArgsConstructor public class FraudInspectorServlet extends GenericServlet { - private Servlet thriftServlet; - private final WbListServiceSrv.Iface fraudInspectorHandler; + private Servlet thriftServlet; @Override public void init(ServletConfig config) throws ServletException { diff --git a/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java b/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java index 548e8f2..a589505 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java @@ -21,16 +21,17 @@ import java.util.Properties; @RequiredArgsConstructor public class WbListStreamFactory { - @Value("${kafka.wblist.topic.command}") - private String readTopic; - @Value("${kafka.wblist.topic.event.sink}") - private String resultTopic; - private final CommandSerde commandSerde = new CommandSerde(); private final EventSerde eventSerde = new EventSerde(); private final CommandService commandService; private final RetryTemplate retryTemplate; + @Value("${kafka.wblist.topic.command}") + private String readTopic; + + @Value("${kafka.wblist.topic.event.sink}") + private String resultTopic; + public KafkaStreams create(final Properties streamsConfiguration) { try { StreamsBuilder builder = new StreamsBuilder(); diff --git a/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java b/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java index 210550d..210c27f 100644 --- a/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java +++ b/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java @@ -16,10 +16,12 @@ public class KeyGenerator { public static String generateKey(com.rbkmoney.damsel.wb_list.Row row) { if (row.isSetId() && row.getId().isSetPaymentId()) { PaymentId paymentId = row.getId().getPaymentId(); - return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), paymentId.getShopId()); + return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), + paymentId.getShopId()); } else if (row.isSetId() && row.getId().isSetP2pId()) { P2pId p2pId = row.getId().getP2pId(); - return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, p2pId.getIdentityId()); + return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, + p2pId.getIdentityId()); } return generateKey(row.getListType(), row.getListName(), row.getValue(), row.getPartyId(), row.getShopId()); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2c1d2d5..7c7c945 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,20 +1,38 @@ server: - port: @server.port@ + port: '@server.port@' + rest: + port: '@server.rest.port@' + endpoint: ch-manager management: security: flag: false + server: + port: '@management.port@' metrics: export: statsd: flavor: etsy + enabled: false + prometheus: + enabled: false + endpoint: + health: + show-details: always + metrics: + enabled: true + prometheus: + enabled: true + endpoints: + web: + exposure: + include: health,info,prometheus --- spring: application: - name: @name@ + name: '@project.name@' output: ansi: enabled: always - riak: address: localhost port: 8087 @@ -35,4 +53,4 @@ kafka: keystore-password: kenny key-password: kenny server-password: kenny12 - server-keystore-location: src/main/resources/cert/truststore.p12 \ No newline at end of file + server-keystore-location: src/main/resources/cert/truststore.p12 diff --git a/src/test/java/com/rbkmoney/wb/list/manager/AbstractRiakIntegrationTest.java b/src/test/java/com/rbkmoney/wb/list/manager/AbstractRiakIntegrationTest.java index f4bdcfe..2747599 100644 --- a/src/test/java/com/rbkmoney/wb/list/manager/AbstractRiakIntegrationTest.java +++ b/src/test/java/com/rbkmoney/wb/list/manager/AbstractRiakIntegrationTest.java @@ -9,8 +9,6 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; -import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.containers.wait.strategy.WaitAllStrategy; import java.time.Duration; diff --git a/src/test/java/com/rbkmoney/wb/list/manager/KafkaAbstractTest.java b/src/test/java/com/rbkmoney/wb/list/manager/KafkaAbstractTest.java index 4a1ea60..02c1be0 100644 --- a/src/test/java/com/rbkmoney/wb/list/manager/KafkaAbstractTest.java +++ b/src/test/java/com/rbkmoney/wb/list/manager/KafkaAbstractTest.java @@ -43,6 +43,25 @@ public abstract class KafkaAbstractTest extends AbstractRiakIntegrationTest { .withEmbeddedZookeeper() .withStartupTimeout(Duration.ofMinutes(2)); + public static Consumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + + public static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class); + return new KafkaProducer<>(props); + } + public static class Initializer implements ApplicationContextInitializer { @Override public void initialize(ConfigurableApplicationContext configurableApplicationContext) { @@ -67,23 +86,4 @@ public abstract class KafkaAbstractTest extends AbstractRiakIntegrationTest { } } - public static Consumer createConsumer() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return new KafkaConsumer<>(props); - } - - public static Producer createProducer() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class); - return new KafkaProducer<>(props); - } - } diff --git a/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java b/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java index e74ea04..7b24fe1 100644 --- a/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java +++ b/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java @@ -18,6 +18,8 @@ import org.springframework.test.context.ContextConfiguration; import java.util.Optional; import java.util.concurrent.ExecutionException; +import static java.lang.Thread.sleep; + @SpringBootTest @ContextConfiguration(classes = {ListRepository.class, RiakConfig.class}) public class RiakTest extends KafkaAbstractTest { @@ -26,7 +28,7 @@ public class RiakTest extends KafkaAbstractTest { private static final String KEY = "key"; @Value("${riak.bucket}") - private String BUCKET_NAME; + private String bucketName; @Autowired private ListRepository listRepository; @@ -36,12 +38,14 @@ public class RiakTest extends KafkaAbstractTest { @Test public void riakTest() throws ExecutionException, InterruptedException { + sleep(10000); + Row row = new Row(); row.setKey(KEY); row.setValue(VALUE); listRepository.create(row); - Namespace ns = new Namespace(BUCKET_NAME); + Namespace ns = new Namespace(bucketName); Location location = new Location(ns, KEY); FetchValue fv = new FetchValue.Builder(location).build(); FetchValue.Response response = client.execute(fv); diff --git a/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java b/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java index 195f451..3fb92af 100644 --- a/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java +++ b/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java @@ -39,27 +39,37 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen @ContextConfiguration(classes = WbListManagerApplication.class) public class WbListManagerApplicationTest extends KafkaAbstractTest { + public static final String IDENTITY_ID = "identityId"; private static final String VALUE = "value"; private static final String KEY = "key"; private static final String SHOP_ID = "shopId"; private static final String PARTY_ID = "partyId"; private static final String LIST_NAME = "listName"; - public static final String IDENTITY_ID = "identityId"; - - @LocalServerPort - int serverPort; - private static String SERVICE_URL = "http://localhost:%s/wb_list/v1"; @Value("${kafka.wblist.topic.command}") public String topic; - - @Value("${riak.bucket}") - private String BUCKET_NAME; @Value("${kafka.wblist.topic.event.sink}") public String topicEventSink; + @LocalServerPort + int serverPort; + + @Value("${riak.bucket}") + private String bucketName; + + public static Consumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "CLIENT"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + return new KafkaConsumer<>(props); + } + @Test public void kafkaRowTest() throws Exception { THClientBuilder clientBuilder = new THClientBuilder() @@ -85,7 +95,8 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { consumer.poll(Duration.ofSeconds(1)); consumerRecords.forEach(record -> { log.info("poll message: {}", record.value()); - eventList.add(record.value());}); + eventList.add(record.value()); + }); consumer.close(); assertEquals(2, eventList.size()); @@ -95,7 +106,8 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { changeCommand = createCommand(row); row.setShopId(null); - ProducerRecord producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); producer.send(producerRecord).get(); producer.close(); Thread.sleep(1000L); @@ -146,32 +158,38 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { assertFalse(exist); } - private void produceDeleteRow(ChangeCommand changeCommand) throws InterruptedException, java.util.concurrent.ExecutionException { + private void produceDeleteRow(ChangeCommand changeCommand) + throws InterruptedException, java.util.concurrent.ExecutionException { Producer producer = createProducer(); changeCommand.setCommand(Command.DELETE); - ProducerRecord producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); producer.send(producerRecord).get(); producer.close(); Thread.sleep(1000L); } - private ChangeCommand produceCreateRow(com.rbkmoney.damsel.wb_list.Row row) throws InterruptedException, java.util.concurrent.ExecutionException { + private ChangeCommand produceCreateRow(com.rbkmoney.damsel.wb_list.Row row) + throws InterruptedException, java.util.concurrent.ExecutionException { Producer producer = createProducer(); ChangeCommand changeCommand = createCommand(row); - ProducerRecord producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); producer.send(producerRecord).get(); producer.close(); Thread.sleep(1000L); return changeCommand; } - - private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount, String partyId, String shopId) throws InterruptedException, java.util.concurrent.ExecutionException, TException { + private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount, String partyId, + String shopId) + throws InterruptedException, java.util.concurrent.ExecutionException, TException { Row rowWithCountInfo = createRow(startTimeCount, partyId, shopId); return iface.getRowInfo(rowWithCountInfo).getRowInfo(); } - private Row createRow(String startTimeCount, String partyId, String shopId) throws InterruptedException, java.util.concurrent.ExecutionException { + private Row createRow(String startTimeCount, String partyId, String shopId) + throws InterruptedException, java.util.concurrent.ExecutionException { Producer producer; ChangeCommand changeCommand; ProducerRecord producerRecord; @@ -186,14 +204,6 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { return rowWithCountInfo; } - @NotNull - private ChangeCommandWrapper createCommand(com.rbkmoney.damsel.wb_list.Row row) { - ChangeCommandWrapper changeCommand = new ChangeCommandWrapper(); - changeCommand.setCommand(Command.CREATE); - changeCommand.setRow(row); - return changeCommand; - } - @NotNull private com.rbkmoney.damsel.wb_list.Row createRow() { Row row = createListRow(); @@ -204,6 +214,15 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { return row; } + @NotNull + private ChangeCommandWrapper createCommand(com.rbkmoney.damsel.wb_list.Row row) { + ChangeCommandWrapper changeCommand = new ChangeCommandWrapper(); + changeCommand.setCommand(Command.CREATE); + changeCommand.setRow(row); + return changeCommand; + } + + @NotNull private com.rbkmoney.damsel.wb_list.Row createRowOld() { Row row = createListRow() @@ -222,7 +241,8 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { } @NotNull - private com.rbkmoney.damsel.wb_list.Row createRowWithCountInfo(String startTimeCount, String partyId, String shopId) { + private com.rbkmoney.damsel.wb_list.Row createRowWithCountInfo(String startTimeCount, String partyId, + String shopId) { com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row(); row.setId(IdInfo.payment_id(new PaymentId() .setShopId(SHOP_ID) @@ -240,15 +260,4 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest { return row; } - public static Consumer createConsumer() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "CLIENT"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - return new KafkaConsumer<>(props); - } - } diff --git a/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java b/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java index 90ba9e6..6819672 100644 --- a/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java +++ b/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java @@ -46,18 +46,18 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest { private static final String PARTY_ID = "partyId"; private static final String LIST_NAME = "listName"; - @MockBean - private ListRepository listRepository; - @Value("${kafka.wblist.topic.command}") public String topic; - - @Value("${riak.bucket}") - private String BUCKET_NAME; @Value("${kafka.wblist.topic.event.sink}") public String topicEventSink; + @MockBean + private ListRepository listRepository; + + @Value("${riak.bucket}") + private String bucketName; + public static Producer createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); @@ -67,6 +67,16 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest { return new KafkaProducer<>(props); } + public static Consumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + @Test public void kafkaRowTestException() throws Exception { doThrow(new RiakExecutionException(), @@ -77,7 +87,8 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest { Producer producerNew = createProducer(); ChangeCommand changeCommand = createCommand(); changeCommand.setCommand(Command.CREATE); - ProducerRecord producerRecordCommand = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); + ProducerRecord producerRecordCommand = + new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand); producerNew.send(producerRecordCommand).get(); producerNew.close(); @@ -108,14 +119,4 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest { return row; } - public static Consumer createConsumer() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return new KafkaConsumer<>(props); - } - }