From 7b8756e6ca9fc9a5eab35d09b587654ff87fcc1d Mon Sep 17 00:00:00 2001
From: mr-impossibru <64555470+mr-impossibru@users.noreply.github.com>
Date: Tue, 19 Oct 2021 17:25:01 +0300
Subject: [PATCH] JD-734: remove p2p, attempt to stabilize tests (#30)
JD-734: remove p2p, attempt to stabilize tests
---
pom.xml | 11 +++--
.../wb/list/manager/constant/RowType.java | 11 -----
.../manager/handler/WbListServiceHandler.java | 5 +--
.../manager/stream/WbListStreamFactory.java | 13 +++++-
.../wb/list/manager/utils/KeyGenerator.java | 6 +--
.../rbkmoney/wb/list/manager/RiakTest.java | 17 ++++---
.../manager/WbListManagerApplicationTest.java | 45 +++++++++----------
.../manager/WbListSafetyApplicationTest.java | 3 +-
.../extension/AwaitilityExtension.java | 20 +++++++++
9 files changed, 76 insertions(+), 55 deletions(-)
delete mode 100644 src/main/java/com/rbkmoney/wb/list/manager/constant/RowType.java
create mode 100644 src/test/java/com/rbkmoney/wb/list/manager/extension/AwaitilityExtension.java
diff --git a/pom.xml b/pom.xml
index 2c0a44e..7a93cdf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.rbkmoney
service-parent-pom
- 2.0.8
+ 2.0.9
wb-list-manager
@@ -18,14 +18,13 @@
UTF-8
UTF-8
- 11
8022
8023
${server.port} ${management.port}
c0612d6052ac049496b72a23a04acb142035f249
dr2.rbkmoney.com
0.3.7
- 1.33-554d59c
+ 1.39-e20ba71
0.1.4
1.3.0
@@ -120,6 +119,12 @@
${testcontainers.annotations.version}
test
+
+ org.awaitility
+ awaitility
+ 4.1.0
+ test
+
diff --git a/src/main/java/com/rbkmoney/wb/list/manager/constant/RowType.java b/src/main/java/com/rbkmoney/wb/list/manager/constant/RowType.java
deleted file mode 100644
index 4cd055a..0000000
--- a/src/main/java/com/rbkmoney/wb/list/manager/constant/RowType.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.rbkmoney.wb.list.manager.constant;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class RowType {
-
- public static final String P_2_P = "p2p";
-
-}
diff --git a/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java b/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java
index b08fb8c..83b3723 100644
--- a/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java
+++ b/src/main/java/com/rbkmoney/wb/list/manager/handler/WbListServiceHandler.java
@@ -2,7 +2,6 @@ package com.rbkmoney.wb.list.manager.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.damsel.wb_list.*;
-import com.rbkmoney.wb.list.manager.constant.RowType;
import com.rbkmoney.wb.list.manager.exception.RiakExecutionException;
import com.rbkmoney.wb.list.manager.model.CountInfoModel;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
@@ -88,9 +87,7 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(),
paymentId.getShopId());
} else if (row.isSetId() && row.getId().isSetP2pId()) {
- P2pId p2pId = row.getId().getP2pId();
- return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P,
- p2pId.getIdentityId());
+ throw new IllegalStateException("P2P is not supported. Row: " + row);
}
return cascadeGetRow(row.list_type, row.list_name, row.value, row.getPartyId(), row.getShopId());
}
diff --git a/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java b/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java
index a589505..2790a33 100644
--- a/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java
+++ b/src/main/java/com/rbkmoney/wb/list/manager/stream/WbListStreamFactory.java
@@ -1,5 +1,6 @@
package com.rbkmoney.wb.list.manager.stream;
+import com.rbkmoney.damsel.wb_list.ChangeCommand;
import com.rbkmoney.wb.list.manager.serializer.CommandSerde;
import com.rbkmoney.wb.list.manager.serializer.EventSerde;
import com.rbkmoney.wb.list.manager.service.CommandService;
@@ -36,7 +37,7 @@ public class WbListStreamFactory {
try {
StreamsBuilder builder = new StreamsBuilder();
builder.stream(readTopic, Consumed.with(Serdes.String(), commandSerde))
- .filter((s, changeCommand) -> changeCommand != null && changeCommand.getCommand() != null)
+ .filter((s, changeCommand) -> hasChangeCommand(changeCommand) && !isP2P(changeCommand))
.peek((s, changeCommand) -> log.info("Command stream check command: {}", changeCommand))
.mapValues(command ->
retryTemplate.execute(args -> commandService.apply(command)))
@@ -48,4 +49,14 @@ public class WbListStreamFactory {
}
}
+ private boolean hasChangeCommand(ChangeCommand changeCommand) {
+ return changeCommand != null && changeCommand.getCommand() != null;
+ }
+
+ private boolean isP2P(ChangeCommand changeCommand) {
+ return changeCommand.getRow() != null
+ && changeCommand.getRow().isSetId()
+ && changeCommand.getRow().getId().isSetP2pId();
+ }
+
}
diff --git a/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java b/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java
index 93d6471..19d68f6 100644
--- a/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java
+++ b/src/main/java/com/rbkmoney/wb/list/manager/utils/KeyGenerator.java
@@ -1,9 +1,7 @@
package com.rbkmoney.wb.list.manager.utils;
import com.rbkmoney.damsel.wb_list.ListType;
-import com.rbkmoney.damsel.wb_list.P2pId;
import com.rbkmoney.damsel.wb_list.PaymentId;
-import com.rbkmoney.wb.list.manager.constant.RowType;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.util.StringUtils;
@@ -19,9 +17,7 @@ public class KeyGenerator {
return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(),
paymentId.getShopId());
} else if (row.isSetId() && row.getId().isSetP2pId()) {
- P2pId p2pId = row.getId().getP2pId();
- return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P,
- p2pId.getIdentityId());
+ throw new IllegalStateException("P2P is not supported. Row: " + row);
}
return generateKey(row.getListType(), row.getListName(), row.getValue(), row.getPartyId(), row.getShopId());
}
diff --git a/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java b/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java
index a35e956..1c68fd1 100644
--- a/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java
+++ b/src/test/java/com/rbkmoney/wb/list/manager/RiakTest.java
@@ -5,9 +5,11 @@ import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.RiakObject;
+import com.rbkmoney.wb.list.manager.extension.AwaitilityExtension;
import com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension;
import com.rbkmoney.wb.list.manager.model.Row;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -16,11 +18,11 @@ import org.springframework.boot.test.context.SpringBootTest;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
-import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.*;
-@ExtendWith(RiakTestcontainerExtension.class)
+@ExtendWith({RiakTestcontainerExtension.class, AwaitilityExtension.class})
@SpringBootTest
public class RiakTest {
@@ -38,12 +40,17 @@ public class RiakTest {
@Test
void riakTest() throws ExecutionException, InterruptedException {
- sleep(20000);
-
Row row = new Row();
row.setKey(KEY);
row.setValue(VALUE);
- listRepository.create(row);
+ Awaitility.await()
+ .pollDelay(5_000L, TimeUnit.MILLISECONDS)
+ .atMost(20_000L, TimeUnit.MILLISECONDS)
+ .ignoreExceptions()
+ .until(() -> {
+ listRepository.create(row);
+ return listRepository.get(KEY).isPresent();
+ });
Namespace ns = new Namespace(bucketName);
Location location = new Location(ns, KEY);
diff --git a/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java b/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java
index 1b7bc1f..fabfe88 100644
--- a/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java
+++ b/src/test/java/com/rbkmoney/wb/list/manager/WbListManagerApplicationTest.java
@@ -6,11 +6,14 @@ import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumer;
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumerConfig;
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducer;
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducerConfig;
+import com.rbkmoney.wb.list.manager.extension.AwaitilityExtension;
import com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension;
import com.rbkmoney.wb.list.manager.utils.ChangeCommandWrapper;
+import com.rbkmoney.woody.api.flow.error.WRuntimeException;
import com.rbkmoney.woody.thrift.impl.http.THClientBuilder;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -32,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
import static org.testcontainers.shaded.com.trilead.ssh2.ChannelCondition.TIMEOUT;
-@ExtendWith({RiakTestcontainerExtension.class})
+@ExtendWith({RiakTestcontainerExtension.class, AwaitilityExtension.class})
@KafkaTestcontainer(topicsKeys = {"kafka.wblist.topic.command", "kafka.wblist.topic.event.sink"})
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ContextConfiguration(
@@ -44,7 +47,6 @@ public class WbListManagerApplicationTest {
public static final String IDENTITY_ID = "identityId";
private static final String VALUE = "value";
- private static final String KEY = "key";
private static final String SHOP_ID = "shopId";
private static final String PARTY_ID = "partyId";
private static final String LIST_NAME = "listName";
@@ -79,15 +81,12 @@ public class WbListManagerApplicationTest {
void kafkaStreamsTest() throws Exception {
Row testRow = TestObjectFactory.testRow();
ChangeCommand changeCommand = produceCreateRow(testRow);
-
- boolean exist = handler.isExist(changeCommand.getRow());
-
- assertTrue(exist);
-
+ Awaitility.await()
+ .until(() -> handler.isExist(changeCommand.getRow()));
produceDeleteRow(changeCommand);
- exist = handler.isExist(changeCommand.getRow());
+ boolean exist = handler.isExist(changeCommand.getRow());
assertFalse(exist);
@@ -107,16 +106,13 @@ public class WbListManagerApplicationTest {
ChangeCommand changeCommand = createCommand(row);
row.setShopId(null);
testThriftKafkaProducer.send(topic, changeCommand);
- Thread.sleep(1000L);
-
- boolean exist = handler.isExist(row);
-
- assertTrue(exist);
+ Awaitility.await()
+ .until(() -> handler.isExist(changeCommand.getRow()));
row.setShopId(SHOP_ID);
- exist = handler.isExist(row);
+ boolean exist = handler.isExist(row);
assertTrue(exist);
@@ -153,22 +149,23 @@ public class WbListManagerApplicationTest {
assertFalse(rowInfo.getCountInfo().getStartCountTime().isEmpty());
- Row rowP2p = createListRow();
- rowP2p.setId(IdInfo.p2p_id(new P2pId().setIdentityId(IDENTITY_ID)));
- changeCommand = produceCreateRow(rowP2p);
-
- exist = handler.isExist(changeCommand.getRow());
-
- assertTrue(exist);
-
-
produceDeleteRow(changeCommand);
-
exist = handler.isExist(changeCommand.getRow());
assertFalse(exist);
}
+ @Test
+ public void kafkaRowP2PTest() throws InterruptedException {
+ Row rowP2p = createListRow();
+ rowP2p.setId(IdInfo.p2p_id(new P2pId().setIdentityId(IDENTITY_ID)));
+ ChangeCommand p2pChangeCommand = produceCreateRow(rowP2p);
+ assertThrows(WRuntimeException.class, () -> handler.isExist(p2pChangeCommand.getRow()));
+
+ produceDeleteRow(p2pChangeCommand);
+ assertThrows(WRuntimeException.class, () -> handler.isExist(p2pChangeCommand.getRow()));
+ }
+
private Row createRow(String startTimeCount) throws InterruptedException {
ChangeCommand changeCommand;
Row rowWithCountInfo = createRowWithCountInfo(startTimeCount);
diff --git a/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java b/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java
index bba74e0..dcb90e7 100644
--- a/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java
+++ b/src/test/java/com/rbkmoney/wb/list/manager/WbListSafetyApplicationTest.java
@@ -45,8 +45,7 @@ public class WbListSafetyApplicationTest {
testThriftKafkaProducer.send(topic, changeCommand);
- Thread.sleep(2000L);
- verify(listRepository, times(3)).create(any());
+ verify(listRepository, timeout(2000L).times(3)).create(any());
}
}
diff --git a/src/test/java/com/rbkmoney/wb/list/manager/extension/AwaitilityExtension.java b/src/test/java/com/rbkmoney/wb/list/manager/extension/AwaitilityExtension.java
new file mode 100644
index 0000000..7da9bc3
--- /dev/null
+++ b/src/test/java/com/rbkmoney/wb/list/manager/extension/AwaitilityExtension.java
@@ -0,0 +1,20 @@
+package com.rbkmoney.wb.list.manager.extension;
+
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import java.util.concurrent.TimeUnit;
+
+public class AwaitilityExtension implements BeforeAllCallback {
+
+ public static final long TIMEOUT = 10_000L;
+ public static final long POLL_INTERVAL = 500L;
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ Awaitility.setDefaultPollInterval(TIMEOUT, TimeUnit.MILLISECONDS);
+ Awaitility.setDefaultTimeout(POLL_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+}