JD-734: remove p2p, attempt to stabilize tests (#30)

JD-734: remove p2p, attempt to stabilize tests
This commit is contained in:
mr-impossibru 2021-10-19 17:25:01 +03:00 committed by GitHub
parent 7efb78d6fc
commit 7b8756e6ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 76 additions and 55 deletions

11
pom.xml
View File

@ -6,7 +6,7 @@
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>service-parent-pom</artifactId>
<version>2.0.8</version>
<version>2.0.9</version>
</parent>
<artifactId>wb-list-manager</artifactId>
@ -18,14 +18,13 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>11</java.version>
<server.port>8022</server.port>
<management.port>8023</management.port>
<exposed.ports>${server.port} ${management.port}</exposed.ports>
<dockerfile.base.service.tag>c0612d6052ac049496b72a23a04acb142035f249</dockerfile.base.service.tag>
<dockerfile.registry>dr2.rbkmoney.com</dockerfile.registry>
<shared.resources.version>0.3.7</shared.resources.version>
<wb.list.proto.version>1.33-554d59c</wb.list.proto.version>
<wb.list.proto.version>1.39-e20ba71</wb.list.proto.version>
<kafka.common.lib.version>0.1.4</kafka.common.lib.version>
<testcontainers.annotations.version>1.3.0</testcontainers.annotations.version>
</properties>
@ -120,6 +119,12 @@
<version>${testcontainers.annotations.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -1,11 +0,0 @@
package com.rbkmoney.wb.list.manager.constant;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RowType {
public static final String P_2_P = "p2p";
}

View File

@ -2,7 +2,6 @@ package com.rbkmoney.wb.list.manager.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.damsel.wb_list.*;
import com.rbkmoney.wb.list.manager.constant.RowType;
import com.rbkmoney.wb.list.manager.exception.RiakExecutionException;
import com.rbkmoney.wb.list.manager.model.CountInfoModel;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
@ -88,9 +87,7 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(),
paymentId.getShopId());
} else if (row.isSetId() && row.getId().isSetP2pId()) {
P2pId p2pId = row.getId().getP2pId();
return cascadeGetRow(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P,
p2pId.getIdentityId());
throw new IllegalStateException("P2P is not supported. Row: " + row);
}
return cascadeGetRow(row.list_type, row.list_name, row.value, row.getPartyId(), row.getShopId());
}

View File

@ -1,5 +1,6 @@
package com.rbkmoney.wb.list.manager.stream;
import com.rbkmoney.damsel.wb_list.ChangeCommand;
import com.rbkmoney.wb.list.manager.serializer.CommandSerde;
import com.rbkmoney.wb.list.manager.serializer.EventSerde;
import com.rbkmoney.wb.list.manager.service.CommandService;
@ -36,7 +37,7 @@ public class WbListStreamFactory {
try {
StreamsBuilder builder = new StreamsBuilder();
builder.stream(readTopic, Consumed.with(Serdes.String(), commandSerde))
.filter((s, changeCommand) -> changeCommand != null && changeCommand.getCommand() != null)
.filter((s, changeCommand) -> hasChangeCommand(changeCommand) && !isP2P(changeCommand))
.peek((s, changeCommand) -> log.info("Command stream check command: {}", changeCommand))
.mapValues(command ->
retryTemplate.execute(args -> commandService.apply(command)))
@ -48,4 +49,14 @@ public class WbListStreamFactory {
}
}
private boolean hasChangeCommand(ChangeCommand changeCommand) {
return changeCommand != null && changeCommand.getCommand() != null;
}
private boolean isP2P(ChangeCommand changeCommand) {
return changeCommand.getRow() != null
&& changeCommand.getRow().isSetId()
&& changeCommand.getRow().getId().isSetP2pId();
}
}

View File

@ -1,9 +1,7 @@
package com.rbkmoney.wb.list.manager.utils;
import com.rbkmoney.damsel.wb_list.ListType;
import com.rbkmoney.damsel.wb_list.P2pId;
import com.rbkmoney.damsel.wb_list.PaymentId;
import com.rbkmoney.wb.list.manager.constant.RowType;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.util.StringUtils;
@ -19,9 +17,7 @@ public class KeyGenerator {
return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(),
paymentId.getShopId());
} else if (row.isSetId() && row.getId().isSetP2pId()) {
P2pId p2pId = row.getId().getP2pId();
return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P,
p2pId.getIdentityId());
throw new IllegalStateException("P2P is not supported. Row: " + row);
}
return generateKey(row.getListType(), row.getListName(), row.getValue(), row.getPartyId(), row.getShopId());
}

View File

@ -5,9 +5,11 @@ import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.RiakObject;
import com.rbkmoney.wb.list.manager.extension.AwaitilityExtension;
import com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension;
import com.rbkmoney.wb.list.manager.model.Row;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
@ -16,11 +18,11 @@ import org.springframework.boot.test.context.SpringBootTest;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.*;
@ExtendWith(RiakTestcontainerExtension.class)
@ExtendWith({RiakTestcontainerExtension.class, AwaitilityExtension.class})
@SpringBootTest
public class RiakTest {
@ -38,12 +40,17 @@ public class RiakTest {
@Test
void riakTest() throws ExecutionException, InterruptedException {
sleep(20000);
Row row = new Row();
row.setKey(KEY);
row.setValue(VALUE);
listRepository.create(row);
Awaitility.await()
.pollDelay(5_000L, TimeUnit.MILLISECONDS)
.atMost(20_000L, TimeUnit.MILLISECONDS)
.ignoreExceptions()
.until(() -> {
listRepository.create(row);
return listRepository.get(KEY).isPresent();
});
Namespace ns = new Namespace(bucketName);
Location location = new Location(ns, KEY);

View File

@ -6,11 +6,14 @@ import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumer;
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumerConfig;
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducer;
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducerConfig;
import com.rbkmoney.wb.list.manager.extension.AwaitilityExtension;
import com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension;
import com.rbkmoney.wb.list.manager.utils.ChangeCommandWrapper;
import com.rbkmoney.woody.api.flow.error.WRuntimeException;
import com.rbkmoney.woody.thrift.impl.http.THClientBuilder;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -32,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
import static org.testcontainers.shaded.com.trilead.ssh2.ChannelCondition.TIMEOUT;
@ExtendWith({RiakTestcontainerExtension.class})
@ExtendWith({RiakTestcontainerExtension.class, AwaitilityExtension.class})
@KafkaTestcontainer(topicsKeys = {"kafka.wblist.topic.command", "kafka.wblist.topic.event.sink"})
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ContextConfiguration(
@ -44,7 +47,6 @@ public class WbListManagerApplicationTest {
public static final String IDENTITY_ID = "identityId";
private static final String VALUE = "value";
private static final String KEY = "key";
private static final String SHOP_ID = "shopId";
private static final String PARTY_ID = "partyId";
private static final String LIST_NAME = "listName";
@ -79,15 +81,12 @@ public class WbListManagerApplicationTest {
void kafkaStreamsTest() throws Exception {
Row testRow = TestObjectFactory.testRow();
ChangeCommand changeCommand = produceCreateRow(testRow);
boolean exist = handler.isExist(changeCommand.getRow());
assertTrue(exist);
Awaitility.await()
.until(() -> handler.isExist(changeCommand.getRow()));
produceDeleteRow(changeCommand);
exist = handler.isExist(changeCommand.getRow());
boolean exist = handler.isExist(changeCommand.getRow());
assertFalse(exist);
@ -107,16 +106,13 @@ public class WbListManagerApplicationTest {
ChangeCommand changeCommand = createCommand(row);
row.setShopId(null);
testThriftKafkaProducer.send(topic, changeCommand);
Thread.sleep(1000L);
boolean exist = handler.isExist(row);
assertTrue(exist);
Awaitility.await()
.until(() -> handler.isExist(changeCommand.getRow()));
row.setShopId(SHOP_ID);
exist = handler.isExist(row);
boolean exist = handler.isExist(row);
assertTrue(exist);
@ -153,22 +149,23 @@ public class WbListManagerApplicationTest {
assertFalse(rowInfo.getCountInfo().getStartCountTime().isEmpty());
Row rowP2p = createListRow();
rowP2p.setId(IdInfo.p2p_id(new P2pId().setIdentityId(IDENTITY_ID)));
changeCommand = produceCreateRow(rowP2p);
exist = handler.isExist(changeCommand.getRow());
assertTrue(exist);
produceDeleteRow(changeCommand);
exist = handler.isExist(changeCommand.getRow());
assertFalse(exist);
}
@Test
public void kafkaRowP2PTest() throws InterruptedException {
Row rowP2p = createListRow();
rowP2p.setId(IdInfo.p2p_id(new P2pId().setIdentityId(IDENTITY_ID)));
ChangeCommand p2pChangeCommand = produceCreateRow(rowP2p);
assertThrows(WRuntimeException.class, () -> handler.isExist(p2pChangeCommand.getRow()));
produceDeleteRow(p2pChangeCommand);
assertThrows(WRuntimeException.class, () -> handler.isExist(p2pChangeCommand.getRow()));
}
private Row createRow(String startTimeCount) throws InterruptedException {
ChangeCommand changeCommand;
Row rowWithCountInfo = createRowWithCountInfo(startTimeCount);

View File

@ -45,8 +45,7 @@ public class WbListSafetyApplicationTest {
testThriftKafkaProducer.send(topic, changeCommand);
Thread.sleep(2000L);
verify(listRepository, times(3)).create(any());
verify(listRepository, timeout(2000L).times(3)).create(any());
}
}

View File

@ -0,0 +1,20 @@
package com.rbkmoney.wb.list.manager.extension;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import java.util.concurrent.TimeUnit;
public class AwaitilityExtension implements BeforeAllCallback {
public static final long TIMEOUT = 10_000L;
public static final long POLL_INTERVAL = 500L;
@Override
public void beforeAll(ExtensionContext context) throws Exception {
Awaitility.setDefaultPollInterval(TIMEOUT, TimeUnit.MILLISECONDS);
Awaitility.setDefaultTimeout(POLL_INTERVAL, TimeUnit.MILLISECONDS);
}
}