mirror of
https://github.com/valitydev/wb-list-manager.git
synced 2024-11-06 01:35:17 +00:00
commit
7efb78d6fc
5
Jenkinsfile
vendored
5
Jenkinsfile
vendored
@ -5,12 +5,11 @@ build('wb-list-manager', 'java-maven') {
|
||||
|
||||
def javaServicePipeline
|
||||
runStage('load JavaService pipeline') {
|
||||
javaServicePipeline = load("build_utils/jenkins_lib/pipeJavaService.groovy")
|
||||
javaServicePipeline = load("build_utils/jenkins_lib/pipeJavaServiceInsideDocker.groovy")
|
||||
}
|
||||
|
||||
def serviceName = env.REPO_NAME
|
||||
def mvnArgs = '-DjvmArgs="-Xmx256m"'
|
||||
def useJava11 = true
|
||||
|
||||
javaServicePipeline(serviceName, useJava11, mvnArgs)
|
||||
javaServicePipeline(serviceName, mvnArgs)
|
||||
}
|
||||
|
13
pom.xml
13
pom.xml
@ -6,11 +6,11 @@
|
||||
<parent>
|
||||
<groupId>com.rbkmoney</groupId>
|
||||
<artifactId>service-parent-pom</artifactId>
|
||||
<version>1.2.5</version>
|
||||
<version>2.0.8</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>wb-list-manager</artifactId>
|
||||
<version>0.0.3-SNAPSHOT</version>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>wb-list-manager</name>
|
||||
@ -27,6 +27,7 @@
|
||||
<shared.resources.version>0.3.7</shared.resources.version>
|
||||
<wb.list.proto.version>1.33-554d59c</wb.list.proto.version>
|
||||
<kafka.common.lib.version>0.1.4</kafka.common.lib.version>
|
||||
<testcontainers.annotations.version>1.3.0</testcontainers.annotations.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -114,9 +115,9 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>kafka</artifactId>
|
||||
<version>1.14.3</version>
|
||||
<groupId>com.rbkmoney</groupId>
|
||||
<artifactId>testcontainers-annotations</artifactId>
|
||||
<version>${testcontainers.annotations.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
@ -178,7 +179,7 @@
|
||||
<plugin>
|
||||
<groupId>org.jacoco</groupId>
|
||||
<artifactId>jacoco-maven-plugin</artifactId>
|
||||
<version>0.8.2</version>
|
||||
<version>0.8.7</version>
|
||||
<configuration>
|
||||
<destFile>${sonar.jacoco.reportPath}</destFile>
|
||||
<append>true</append>
|
||||
|
@ -22,7 +22,7 @@ public class KafkaConfig {
|
||||
private static final String CLIENT_ID = "wb-list-client";
|
||||
private static final String PKCS_12 = "PKCS12";
|
||||
|
||||
@Value("${kafka.bootstrap.servers}")
|
||||
@Value("${kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
@Value("${kafka.ssl.server-password}")
|
||||
private String serverStorePassword;
|
||||
|
@ -22,10 +22,8 @@ public class RiakConfig {
|
||||
.withRemoteAddress(riakAddress)
|
||||
.withRemotePort(riakPort)
|
||||
.build();
|
||||
RiakCluster cluster = new RiakCluster.Builder(node)
|
||||
return new RiakCluster.Builder(node)
|
||||
.build();
|
||||
cluster.start();
|
||||
return cluster;
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
@ -0,0 +1,24 @@
|
||||
package com.rbkmoney.wb.list.manager.config;
|
||||
|
||||
import com.basho.riak.client.core.RiakCluster;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class RiakInitializer {
|
||||
|
||||
private final RiakCluster riakCluster;
|
||||
|
||||
@EventListener(value = ApplicationReadyEvent.class)
|
||||
@Order
|
||||
public void onStartup() {
|
||||
log.info("Starting RiakCluster");
|
||||
riakCluster.start();
|
||||
}
|
||||
}
|
@ -1,5 +1,9 @@
|
||||
package com.rbkmoney.wb.list.manager.constant;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public class RowType {
|
||||
|
||||
public static final String P_2_P = "p2p";
|
||||
|
@ -5,22 +5,7 @@ public class RiakExecutionException extends RuntimeException {
|
||||
public RiakExecutionException() {
|
||||
}
|
||||
|
||||
public RiakExecutionException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public RiakExecutionException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public RiakExecutionException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public RiakExecutionException(String message,
|
||||
Throwable cause,
|
||||
boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
||||
|
@ -1,26 +0,0 @@
|
||||
package com.rbkmoney.wb.list.manager.exception;
|
||||
|
||||
public class UnknownRowTypeException extends RuntimeException {
|
||||
|
||||
public UnknownRowTypeException() {
|
||||
}
|
||||
|
||||
public UnknownRowTypeException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public UnknownRowTypeException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public UnknownRowTypeException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public UnknownRowTypeException(String message,
|
||||
Throwable cause,
|
||||
boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rbkmoney.damsel.wb_list.*;
|
||||
import com.rbkmoney.wb.list.manager.constant.RowType;
|
||||
import com.rbkmoney.wb.list.manager.exception.RiakExecutionException;
|
||||
import com.rbkmoney.wb.list.manager.exception.UnknownRowTypeException;
|
||||
import com.rbkmoney.wb.list.manager.model.CountInfoModel;
|
||||
import com.rbkmoney.wb.list.manager.repository.ListRepository;
|
||||
import com.rbkmoney.wb.list.manager.utils.KeyGenerator;
|
||||
@ -27,7 +26,7 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
|
||||
public boolean isExist(Row row) throws TException {
|
||||
try {
|
||||
return getCascadeRow(row).isPresent();
|
||||
} catch (RiakExecutionException | UnknownRowTypeException e) {
|
||||
} catch (RiakExecutionException e) {
|
||||
log.error("WbListServiceHandler error when isExist row: {} e: ", row, e);
|
||||
throw new TException(e);
|
||||
}
|
||||
@ -63,7 +62,7 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result getRowInfo(Row row) throws ListNotFound, TException {
|
||||
public Result getRowInfo(Row row) {
|
||||
log.info("WbListServiceHandler getRowInfo row: {}", row);
|
||||
Optional<com.rbkmoney.wb.list.manager.model.Row> result = getCascadeRow(row);
|
||||
if (result.isPresent() && row.getListType() == ListType.grey) {
|
||||
|
@ -11,16 +11,16 @@ import java.io.IOException;
|
||||
|
||||
@WebServlet("/wb_list/v1")
|
||||
@RequiredArgsConstructor
|
||||
public class FraudInspectorServlet extends GenericServlet {
|
||||
public class WbListServlet extends GenericServlet {
|
||||
|
||||
private final WbListServiceSrv.Iface fraudInspectorHandler;
|
||||
private final WbListServiceSrv.Iface wbListHandler;
|
||||
private Servlet thriftServlet;
|
||||
|
||||
@Override
|
||||
public void init(ServletConfig config) throws ServletException {
|
||||
super.init(config);
|
||||
thriftServlet = new THServiceBuilder()
|
||||
.build(WbListServiceSrv.Iface.class, fraudInspectorHandler);
|
||||
.build(WbListServiceSrv.Iface.class, wbListHandler);
|
||||
}
|
||||
|
||||
@Override
|
@ -20,25 +20,29 @@ public class CommandServiceImpl implements CommandService {
|
||||
|
||||
@Override
|
||||
public Event apply(ChangeCommand command) {
|
||||
log.info("WbListStreamFactory apply command: {}", command);
|
||||
Event event = new Event();
|
||||
log.info("CommandService apply command: {}", command);
|
||||
Row row = commandToRowConverter.convert(command);
|
||||
log.info("WbListStreamFactory apply row: {}", row);
|
||||
switch (command.command) {
|
||||
case CREATE:
|
||||
listRepository.create(row);
|
||||
event.setEventType(EventType.CREATED);
|
||||
break;
|
||||
case DELETE:
|
||||
listRepository.remove(row);
|
||||
event.setEventType(EventType.DELETED);
|
||||
break;
|
||||
default:
|
||||
log.warn("WbListStreamFactory command for list not found! command: {}", command);
|
||||
throw new RuntimeException("WbListStreamFactory command for list not found!");
|
||||
}
|
||||
log.info("CommandService apply row: {}", row);
|
||||
Event event = applyCommandAndGetEvent(command, row);
|
||||
event.setRow(command.getRow());
|
||||
return event;
|
||||
}
|
||||
|
||||
private Event applyCommandAndGetEvent(ChangeCommand command, Row row) {
|
||||
return switch (command.getCommand()) {
|
||||
case CREATE -> {
|
||||
listRepository.create(row);
|
||||
yield new Event().setEventType(EventType.CREATED);
|
||||
}
|
||||
case DELETE -> {
|
||||
listRepository.remove(row);
|
||||
yield new Event().setEventType(EventType.DELETED);
|
||||
}
|
||||
default -> {
|
||||
log.warn("CommandService command for list not found! command: {}", command);
|
||||
throw new RuntimeException("WbListStreamFactory command for list not found!");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ public class KeyGenerator {
|
||||
|
||||
public static String generateKey(ListType listType, String listName, String value, String... params) {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
if (params != null) {
|
||||
if (params != null && params.length != 0) {
|
||||
for (String param : params) {
|
||||
addIfExist(param, stringBuilder);
|
||||
}
|
||||
@ -42,11 +42,11 @@ public class KeyGenerator {
|
||||
.toString();
|
||||
}
|
||||
|
||||
private static StringBuilder addIfExist(String id, StringBuilder stringBuilder) {
|
||||
if (!StringUtils.isEmpty(id)) {
|
||||
stringBuilder.append(id)
|
||||
private static void addIfExist(String param, StringBuilder stringBuilder) {
|
||||
if (StringUtils.hasLength(param)) {
|
||||
stringBuilder
|
||||
.append(param)
|
||||
.append(DELIMITER);
|
||||
}
|
||||
return stringBuilder;
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,6 @@ management:
|
||||
web:
|
||||
exposure:
|
||||
include: health,info,prometheus
|
||||
---
|
||||
spring:
|
||||
application:
|
||||
name: '@project.name@'
|
||||
@ -43,10 +42,11 @@ retry:
|
||||
max.attempts: 3
|
||||
|
||||
kafka:
|
||||
bootstrap.servers: "localhost:29092"
|
||||
wblist.topic:
|
||||
command: "wb-list-command"
|
||||
event.sink: "wb-list-event-sink"
|
||||
bootstrap-servers: "localhost:29092"
|
||||
wblist:
|
||||
topic:
|
||||
command: "wb-list-command"
|
||||
event.sink: "wb-list-event-sink"
|
||||
ssl:
|
||||
enable: false
|
||||
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12
|
||||
|
@ -1,38 +0,0 @@
|
||||
package com.rbkmoney.wb.list.manager;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.util.TestPropertyValues;
|
||||
import org.springframework.context.ApplicationContextInitializer;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@ContextConfiguration(initializers = AbstractRiakIntegrationTest.Initializer.class)
|
||||
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
|
||||
public abstract class AbstractRiakIntegrationTest {
|
||||
|
||||
@ClassRule
|
||||
public static GenericContainer riak = new GenericContainer("basho/riak-kv")
|
||||
.withExposedPorts(8098, 8087)
|
||||
.withPrivilegedMode(true)
|
||||
.waitingFor(new WaitAllStrategy()
|
||||
.withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
|
||||
@Override
|
||||
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
|
||||
TestPropertyValues
|
||||
.of("riak.port=" + riak.getMappedPort(8087))
|
||||
.applyTo(configurableApplicationContext.getEnvironment());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,89 +0,0 @@
|
||||
package com.rbkmoney.wb.list.manager;
|
||||
|
||||
import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
|
||||
import com.rbkmoney.wb.list.manager.serializer.EventDeserializer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.util.TestPropertyValues;
|
||||
import org.springframework.context.ApplicationContextInitializer;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
|
||||
|
||||
@Slf4j
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = RANDOM_PORT)
|
||||
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
|
||||
@ContextConfiguration(initializers = KafkaAbstractTest.Initializer.class)
|
||||
public abstract class KafkaAbstractTest extends AbstractRiakIntegrationTest {
|
||||
|
||||
public static final String KAFKA_DOCKER_VERSION = "5.0.1";
|
||||
|
||||
@ClassRule
|
||||
public static KafkaContainer kafka = new KafkaContainer(KAFKA_DOCKER_VERSION)
|
||||
.withEmbeddedZookeeper()
|
||||
.withStartupTimeout(Duration.ofMinutes(2));
|
||||
|
||||
public static <T> Consumer<String, T> createConsumer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
return new KafkaConsumer<>(props);
|
||||
}
|
||||
|
||||
public static <T> Producer<String, T> createProducer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT");
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
|
||||
return new KafkaProducer<>(props);
|
||||
}
|
||||
|
||||
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
|
||||
@Override
|
||||
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
|
||||
TestPropertyValues
|
||||
.of("kafka.bootstrap.servers=" + kafka.getBootstrapServers())
|
||||
.applyTo(configurableApplicationContext.getEnvironment());
|
||||
initTopic("wb-list-command");
|
||||
initTopic("wb-list-event-sink");
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private <T> Consumer<String, T> initTopic(String topicName) {
|
||||
Consumer<String, T> consumer = createConsumer();
|
||||
try {
|
||||
consumer.subscribe(Collections.singletonList(topicName));
|
||||
consumer.poll(Duration.ofSeconds(1));
|
||||
} catch (Exception e) {
|
||||
log.error("KafkaAbstractTest initialize e: ", e);
|
||||
}
|
||||
consumer.close();
|
||||
return consumer;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -5,24 +5,24 @@ import com.basho.riak.client.api.commands.kv.FetchValue;
|
||||
import com.basho.riak.client.core.query.Location;
|
||||
import com.basho.riak.client.core.query.Namespace;
|
||||
import com.basho.riak.client.core.query.RiakObject;
|
||||
import com.rbkmoney.wb.list.manager.config.RiakConfig;
|
||||
import com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension;
|
||||
import com.rbkmoney.wb.list.manager.model.Row;
|
||||
import com.rbkmoney.wb.list.manager.repository.ListRepository;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@ExtendWith(RiakTestcontainerExtension.class)
|
||||
@SpringBootTest
|
||||
@ContextConfiguration(classes = {ListRepository.class, RiakConfig.class})
|
||||
public class RiakTest extends KafkaAbstractTest {
|
||||
public class RiakTest {
|
||||
|
||||
private static final String VALUE = "value";
|
||||
private static final String KEY = "key";
|
||||
@ -37,8 +37,8 @@ public class RiakTest extends KafkaAbstractTest {
|
||||
private RiakClient client;
|
||||
|
||||
@Test
|
||||
public void riakTest() throws ExecutionException, InterruptedException {
|
||||
sleep(10000);
|
||||
void riakTest() throws ExecutionException, InterruptedException {
|
||||
sleep(20000);
|
||||
|
||||
Row row = new Row();
|
||||
row.setKey(KEY);
|
||||
@ -52,17 +52,15 @@ public class RiakTest extends KafkaAbstractTest {
|
||||
RiakObject obj = response.getValue(RiakObject.class);
|
||||
|
||||
String result = obj.getValue().toString();
|
||||
Assert.assertEquals(VALUE, result);
|
||||
assertEquals(VALUE, result);
|
||||
|
||||
Optional<Row> resultGet = listRepository.get(KEY);
|
||||
Assert.assertFalse(resultGet.isEmpty());
|
||||
Assert.assertEquals(VALUE, resultGet.get().getValue());
|
||||
assertFalse(resultGet.isEmpty());
|
||||
assertEquals(VALUE, resultGet.get().getValue());
|
||||
|
||||
listRepository.remove(row);
|
||||
response = client.execute(fv);
|
||||
obj = response.getValue(RiakObject.class);
|
||||
Assert.assertNull(obj);
|
||||
|
||||
assertNull(obj);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,47 @@
|
||||
package com.rbkmoney.wb.list.manager;
|
||||
|
||||
import com.rbkmoney.damsel.wb_list.*;
|
||||
import com.rbkmoney.wb.list.manager.utils.ChangeCommandWrapper;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.UUID;
|
||||
|
||||
public abstract class TestObjectFactory {
|
||||
|
||||
public static com.rbkmoney.damsel.wb_list.Row testRow() {
|
||||
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
|
||||
row.setId(IdInfo.payment_id(new PaymentId()
|
||||
.setShopId(randomString())
|
||||
.setPartyId(randomString())
|
||||
));
|
||||
row.setListType(ListType.black);
|
||||
row.setListName(randomString());
|
||||
row.setValue(randomString());
|
||||
row.setPartyId(row.getId().getPaymentId().getPartyId());
|
||||
row.setShopId(row.getId().getPaymentId().getShopId());
|
||||
return row;
|
||||
}
|
||||
|
||||
public static String randomString() {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
public static ChangeCommandWrapper testCommand() {
|
||||
ChangeCommandWrapper changeCommand = new ChangeCommandWrapper();
|
||||
changeCommand.setCommand(Command.CREATE);
|
||||
com.rbkmoney.damsel.wb_list.Row row = testRow();
|
||||
changeCommand.setRow(row);
|
||||
return changeCommand;
|
||||
}
|
||||
|
||||
public static Row testRowWithCountInfo(String startTimeCount) {
|
||||
Row row = testRow();
|
||||
row.setRowInfo(RowInfo.count_info(
|
||||
new CountInfo()
|
||||
.setCount(5L)
|
||||
.setStartCountTime(startTimeCount)
|
||||
.setTimeToLive(Instant.now().plusSeconds(6000L).toString()))
|
||||
);
|
||||
return row;
|
||||
}
|
||||
}
|
@ -1,43 +1,46 @@
|
||||
package com.rbkmoney.wb.list.manager;
|
||||
|
||||
import com.rbkmoney.damsel.wb_list.*;
|
||||
import com.rbkmoney.wb.list.manager.serializer.EventDeserializer;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.KafkaTestcontainer;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumer;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumerConfig;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducer;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducerConfig;
|
||||
import com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension;
|
||||
import com.rbkmoney.wb.list.manager.utils.ChangeCommandWrapper;
|
||||
import com.rbkmoney.woody.thrift.impl.http.THClientBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.apache.thrift.TException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.rnorth.ducttape.unreliables.Unreliables;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
|
||||
import static org.testcontainers.shaded.com.trilead.ssh2.ChannelCondition.TIMEOUT;
|
||||
|
||||
@Slf4j
|
||||
@RunWith(SpringRunner.class)
|
||||
@ExtendWith({RiakTestcontainerExtension.class})
|
||||
@KafkaTestcontainer(topicsKeys = {"kafka.wblist.topic.command", "kafka.wblist.topic.event.sink"})
|
||||
@SpringBootTest(webEnvironment = RANDOM_PORT)
|
||||
@ContextConfiguration(classes = WbListManagerApplication.class)
|
||||
public class WbListManagerApplicationTest extends KafkaAbstractTest {
|
||||
@ContextConfiguration(
|
||||
classes = {
|
||||
WbListManagerApplication.class,
|
||||
KafkaProducerConfig.class,
|
||||
KafkaConsumerConfig.class})
|
||||
public class WbListManagerApplicationTest {
|
||||
|
||||
public static final String IDENTITY_ID = "identityId";
|
||||
private static final String VALUE = "value";
|
||||
@ -56,194 +59,155 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
|
||||
@LocalServerPort
|
||||
int serverPort;
|
||||
|
||||
@Value("${riak.bucket}")
|
||||
private String bucketName;
|
||||
@Autowired
|
||||
private KafkaProducer<TBase<?, ?>> testThriftKafkaProducer;
|
||||
|
||||
public static Consumer<String, Event> createConsumer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "CLIENT");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
|
||||
return new KafkaConsumer<>(props);
|
||||
}
|
||||
@Autowired
|
||||
private KafkaConsumer<Event> testCommandKafkaConsumer;
|
||||
|
||||
@Test
|
||||
public void kafkaRowTest() throws Exception {
|
||||
private WbListServiceSrv.Iface handler;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws URISyntaxException {
|
||||
THClientBuilder clientBuilder = new THClientBuilder()
|
||||
.withAddress(new URI(String.format(SERVICE_URL, serverPort)))
|
||||
.withNetworkTimeout(300000);
|
||||
WbListServiceSrv.Iface iface = clientBuilder.build(WbListServiceSrv.Iface.class);
|
||||
handler = clientBuilder.build(WbListServiceSrv.Iface.class);
|
||||
}
|
||||
|
||||
ChangeCommand changeCommand = produceCreateRow(createRow());
|
||||
@Test
|
||||
void kafkaStreamsTest() throws Exception {
|
||||
Row testRow = TestObjectFactory.testRow();
|
||||
ChangeCommand changeCommand = produceCreateRow(testRow);
|
||||
|
||||
boolean exist = handler.isExist(changeCommand.getRow());
|
||||
|
||||
boolean exist = iface.isExist(changeCommand.getRow());
|
||||
assertTrue(exist);
|
||||
|
||||
|
||||
produceDeleteRow(changeCommand);
|
||||
|
||||
exist = iface.isExist(changeCommand.getRow());
|
||||
exist = handler.isExist(changeCommand.getRow());
|
||||
|
||||
assertFalse(exist);
|
||||
|
||||
Consumer<String, Event> consumer = createConsumer();
|
||||
consumer.subscribe(Collections.singletonList(topicEventSink));
|
||||
|
||||
List<Event> eventList = new ArrayList<>();
|
||||
ConsumerRecords<String, Event> consumerRecords =
|
||||
consumer.poll(Duration.ofSeconds(1));
|
||||
consumerRecords.forEach(record -> {
|
||||
log.info("poll message: {}", record.value());
|
||||
eventList.add(record.value());
|
||||
});
|
||||
consumer.close();
|
||||
testCommandKafkaConsumer.read(topicEventSink, data -> eventList.add(data.value()));
|
||||
Unreliables.retryUntilTrue(TIMEOUT, TimeUnit.SECONDS, () -> eventList.size() == 2);
|
||||
|
||||
assertEquals(2, eventList.size());
|
||||
assertTrue(eventList.stream()
|
||||
.map(Event::getRow)
|
||||
.anyMatch(row -> row.getPartyId().equals(testRow.getPartyId())));
|
||||
}
|
||||
|
||||
Producer<String, ChangeCommand> producer = createProducer();
|
||||
@Test
|
||||
void kafkaRowTest() throws Exception { // TODO refactoring
|
||||
Row row = createRowOld();
|
||||
changeCommand = createCommand(row);
|
||||
ChangeCommand changeCommand = createCommand(row);
|
||||
row.setShopId(null);
|
||||
|
||||
ProducerRecord<String, ChangeCommand> producerRecord =
|
||||
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
|
||||
producer.send(producerRecord).get();
|
||||
producer.close();
|
||||
testThriftKafkaProducer.send(topic, changeCommand);
|
||||
Thread.sleep(1000L);
|
||||
|
||||
exist = iface.isExist(row);
|
||||
boolean exist = handler.isExist(row);
|
||||
|
||||
assertTrue(exist);
|
||||
|
||||
|
||||
row.setShopId(SHOP_ID);
|
||||
|
||||
exist = iface.isExist(row);
|
||||
exist = handler.isExist(row);
|
||||
|
||||
assertTrue(exist);
|
||||
|
||||
Result info = iface.getRowInfo(row);
|
||||
|
||||
Result info = handler.getRowInfo(row);
|
||||
|
||||
assertFalse(info.isSetRowInfo());
|
||||
|
||||
row.setListType(ListType.grey);
|
||||
|
||||
row.setListType(ListType.grey);
|
||||
//check without partyId and shop id
|
||||
createRow(Instant.now().toString(), null, null);
|
||||
RowInfo rowInfo = iface.getRowInfo(row).getRowInfo();
|
||||
Row testRow = createRow(Instant.now().toString());
|
||||
RowInfo rowInfo = handler.getRowInfo(testRow).getRowInfo();
|
||||
|
||||
assertEquals(5, rowInfo.getCountInfo().getCount());
|
||||
|
||||
|
||||
//check without partyId
|
||||
createRow(Instant.now().toString(), null, SHOP_ID);
|
||||
rowInfo = iface.getRowInfo(row).getRowInfo();
|
||||
createRow(Instant.now().toString());
|
||||
rowInfo = handler.getRowInfo(row).getRowInfo();
|
||||
|
||||
assertEquals(5, rowInfo.getCountInfo().getCount());
|
||||
|
||||
|
||||
//check full key field
|
||||
createRow(Instant.now().toString(), PARTY_ID, SHOP_ID);
|
||||
rowInfo = iface.getRowInfo(row).getRowInfo();
|
||||
createRow(Instant.now().toString());
|
||||
rowInfo = handler.getRowInfo(row).getRowInfo();
|
||||
|
||||
assertEquals(5, rowInfo.getCountInfo().getCount());
|
||||
|
||||
rowInfo = checkCreateWithCountInfo(iface, Instant.now().toString(), PARTY_ID, SHOP_ID);
|
||||
|
||||
rowInfo = checkCreateWithCountInfo(handler, Instant.now().toString());
|
||||
|
||||
assertFalse(rowInfo.getCountInfo().getStartCountTime().isEmpty());
|
||||
|
||||
|
||||
Row rowP2p = createListRow();
|
||||
rowP2p.setId(IdInfo.p2p_id(new P2pId().setIdentityId(IDENTITY_ID)));
|
||||
|
||||
changeCommand = produceCreateRow(rowP2p);
|
||||
|
||||
exist = iface.isExist(changeCommand.getRow());
|
||||
exist = handler.isExist(changeCommand.getRow());
|
||||
|
||||
assertTrue(exist);
|
||||
|
||||
|
||||
produceDeleteRow(changeCommand);
|
||||
|
||||
exist = iface.isExist(changeCommand.getRow());
|
||||
exist = handler.isExist(changeCommand.getRow());
|
||||
|
||||
assertFalse(exist);
|
||||
}
|
||||
|
||||
private void produceDeleteRow(ChangeCommand changeCommand)
|
||||
throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||
Producer<String, ChangeCommand> producer = createProducer();
|
||||
changeCommand.setCommand(Command.DELETE);
|
||||
ProducerRecord<String, ChangeCommand> producerRecord =
|
||||
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
|
||||
producer.send(producerRecord).get();
|
||||
producer.close();
|
||||
private Row createRow(String startTimeCount) throws InterruptedException {
|
||||
ChangeCommand changeCommand;
|
||||
Row rowWithCountInfo = createRowWithCountInfo(startTimeCount);
|
||||
changeCommand = createCommand(rowWithCountInfo);
|
||||
testThriftKafkaProducer.send(topic, changeCommand);
|
||||
Thread.sleep(1000L);
|
||||
return rowWithCountInfo;
|
||||
}
|
||||
|
||||
private ChangeCommand produceCreateRow(com.rbkmoney.damsel.wb_list.Row row)
|
||||
throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||
Producer<String, ChangeCommand> producer = createProducer();
|
||||
private ChangeCommand produceCreateRow(Row row)
|
||||
throws InterruptedException {
|
||||
ChangeCommand changeCommand = createCommand(row);
|
||||
ProducerRecord<String, ChangeCommand> producerRecord =
|
||||
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
|
||||
producer.send(producerRecord).get();
|
||||
producer.close();
|
||||
testThriftKafkaProducer.send(topic, changeCommand);
|
||||
Thread.sleep(1000L);
|
||||
return changeCommand;
|
||||
}
|
||||
|
||||
private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount, String partyId,
|
||||
String shopId)
|
||||
throws InterruptedException, java.util.concurrent.ExecutionException, TException {
|
||||
Row rowWithCountInfo = createRow(startTimeCount, partyId, shopId);
|
||||
return iface.getRowInfo(rowWithCountInfo).getRowInfo();
|
||||
}
|
||||
|
||||
private Row createRow(String startTimeCount, String partyId, String shopId)
|
||||
throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||
Producer<String, ChangeCommand> producer;
|
||||
ChangeCommand changeCommand;
|
||||
ProducerRecord<String, ChangeCommand> producerRecord;
|
||||
producer = createProducer();
|
||||
Row rowWithCountInfo = createRowWithCountInfo(startTimeCount, partyId, shopId);
|
||||
changeCommand = createCommand(rowWithCountInfo);
|
||||
producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
|
||||
producer.send(producerRecord).get();
|
||||
producer.close();
|
||||
private void produceDeleteRow(ChangeCommand changeCommand)
|
||||
throws InterruptedException {
|
||||
changeCommand.setCommand(Command.DELETE);
|
||||
testThriftKafkaProducer.send(topic, changeCommand);
|
||||
Thread.sleep(1000L);
|
||||
|
||||
return rowWithCountInfo;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private com.rbkmoney.damsel.wb_list.Row createRow() {
|
||||
Row row = createListRow();
|
||||
row.setId(IdInfo.payment_id(new PaymentId()
|
||||
private Row createRowOld() {
|
||||
return createListRow()
|
||||
.setShopId(SHOP_ID)
|
||||
.setPartyId(PARTY_ID)
|
||||
));
|
||||
return row;
|
||||
.setPartyId(PARTY_ID);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private ChangeCommandWrapper createCommand(com.rbkmoney.damsel.wb_list.Row row) {
|
||||
private ChangeCommandWrapper createCommand(Row row) {
|
||||
ChangeCommandWrapper changeCommand = new ChangeCommandWrapper();
|
||||
changeCommand.setCommand(Command.CREATE);
|
||||
changeCommand.setRow(row);
|
||||
return changeCommand;
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
private com.rbkmoney.damsel.wb_list.Row createRowOld() {
|
||||
Row row = createListRow()
|
||||
.setShopId(SHOP_ID)
|
||||
.setPartyId(PARTY_ID);
|
||||
return row;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private Row createListRow() {
|
||||
private Row createRowWithCountInfo(String startTimeCount) {
|
||||
Row row = new Row();
|
||||
row.setListName(LIST_NAME);
|
||||
row.setListType(ListType.black);
|
||||
row.setValue(VALUE);
|
||||
return row;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private com.rbkmoney.damsel.wb_list.Row createRowWithCountInfo(String startTimeCount, String partyId,
|
||||
String shopId) {
|
||||
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
|
||||
row.setId(IdInfo.payment_id(new PaymentId()
|
||||
.setShopId(SHOP_ID)
|
||||
.setPartyId(PARTY_ID)
|
||||
@ -260,4 +224,18 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
|
||||
return row;
|
||||
}
|
||||
|
||||
private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount)
|
||||
throws InterruptedException, TException {
|
||||
Row rowWithCountInfo = createRow(startTimeCount);
|
||||
return iface.getRowInfo(rowWithCountInfo).getRowInfo();
|
||||
}
|
||||
|
||||
private Row createListRow() {
|
||||
Row row = new Row();
|
||||
row.setListName(LIST_NAME);
|
||||
row.setListType(ListType.black);
|
||||
row.setValue(VALUE);
|
||||
return row;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,122 +1,52 @@
|
||||
package com.rbkmoney.wb.list.manager;
|
||||
|
||||
import com.rbkmoney.damsel.wb_list.*;
|
||||
import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
|
||||
import com.rbkmoney.damsel.wb_list.ChangeCommand;
|
||||
import com.rbkmoney.damsel.wb_list.Command;
|
||||
import com.rbkmoney.testcontainers.annotations.KafkaSpringBootTest;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.KafkaTestcontainer;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaProducer;
|
||||
import com.rbkmoney.wb.list.manager.config.MockedStartupInitializers;
|
||||
import com.rbkmoney.wb.list.manager.exception.RiakExecutionException;
|
||||
import com.rbkmoney.wb.list.manager.repository.ListRepository;
|
||||
import com.rbkmoney.wb.list.manager.serializer.EventDeserializer;
|
||||
import com.rbkmoney.wb.list.manager.utils.ChangeCommandWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@Slf4j
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = RANDOM_PORT)
|
||||
@KafkaSpringBootTest
|
||||
@TestPropertySource(properties = {"retry.timeout=100"})
|
||||
@ContextConfiguration(classes = WbListManagerApplication.class)
|
||||
public class WbListSafetyApplicationTest extends KafkaAbstractTest {
|
||||
|
||||
private static final String VALUE = "value";
|
||||
private static final String SHOP_ID = "shopId";
|
||||
private static final String PARTY_ID = "partyId";
|
||||
private static final String LIST_NAME = "listName";
|
||||
@KafkaTestcontainer(topicsKeys = {"kafka.wblist.topic.command", "kafka.wblist.topic.event.sink"})
|
||||
@Import(MockedStartupInitializers.class)
|
||||
public class WbListSafetyApplicationTest {
|
||||
|
||||
@Value("${kafka.wblist.topic.command}")
|
||||
public String topic;
|
||||
|
||||
@Value("${kafka.wblist.topic.event.sink}")
|
||||
public String topicEventSink;
|
||||
@Autowired
|
||||
private KafkaProducer<TBase<?, ?>> testThriftKafkaProducer;
|
||||
|
||||
@MockBean
|
||||
private ListRepository listRepository;
|
||||
|
||||
@Value("${riak.bucket}")
|
||||
private String bucketName;
|
||||
|
||||
public static <T> Producer<String, T> createProducer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT");
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
|
||||
return new KafkaProducer<>(props);
|
||||
}
|
||||
|
||||
public static <T> Consumer<String, T> createConsumer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
return new KafkaConsumer<>(props);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void kafkaRowTestException() throws Exception {
|
||||
void kafkaRowTestException() throws Exception {
|
||||
doThrow(new RiakExecutionException(),
|
||||
new RiakExecutionException())
|
||||
.doNothing()
|
||||
.when(listRepository).create(any());
|
||||
|
||||
Producer<String, ChangeCommand> producerNew = createProducer();
|
||||
ChangeCommand changeCommand = createCommand();
|
||||
ChangeCommand changeCommand = TestObjectFactory.testCommand();
|
||||
changeCommand.setCommand(Command.CREATE);
|
||||
ProducerRecord<String, ChangeCommand> producerRecordCommand =
|
||||
new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
|
||||
producerNew.send(producerRecordCommand).get();
|
||||
producerNew.close();
|
||||
|
||||
testThriftKafkaProducer.send(topic, changeCommand);
|
||||
|
||||
Thread.sleep(2000L);
|
||||
|
||||
Mockito.verify(listRepository, times(3)).create(any());
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private ChangeCommandWrapper createCommand() {
|
||||
ChangeCommandWrapper changeCommand = new ChangeCommandWrapper();
|
||||
changeCommand.setCommand(Command.CREATE);
|
||||
com.rbkmoney.damsel.wb_list.Row row = createRow();
|
||||
changeCommand.setRow(row);
|
||||
return changeCommand;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private com.rbkmoney.damsel.wb_list.Row createRow() {
|
||||
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
|
||||
row.setId(IdInfo.payment_id(new PaymentId()
|
||||
.setShopId(SHOP_ID)
|
||||
.setPartyId(PARTY_ID))
|
||||
);
|
||||
row.setListName(LIST_NAME);
|
||||
row.setListType(ListType.black);
|
||||
row.setValue(VALUE);
|
||||
return row;
|
||||
verify(listRepository, times(3)).create(any());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,20 @@
|
||||
package com.rbkmoney.wb.list.manager.config;
|
||||
|
||||
import com.rbkmoney.damsel.wb_list.Event;
|
||||
import com.rbkmoney.testcontainers.annotations.kafka.config.KafkaConsumer;
|
||||
import com.rbkmoney.wb.list.manager.serializer.EventDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class ConsumerConfig {
|
||||
|
||||
@Value("${kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public KafkaConsumer<Event> testEventKafkaConsumer() {
|
||||
return new KafkaConsumer<>(bootstrapServers, new EventDeserializer());
|
||||
}
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.rbkmoney.wb.list.manager.config;
|
||||
|
||||
import org.springframework.boot.test.context.TestConfiguration;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
|
||||
@TestConfiguration
|
||||
public class MockedStartupInitializers {
|
||||
|
||||
@MockBean
|
||||
private RiakInitializer riakInitializer;
|
||||
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
package com.rbkmoney.wb.list.manager.extension;
|
||||
|
||||
import com.rbkmoney.testcontainers.annotations.util.GenericContainerUtil;
|
||||
import org.junit.jupiter.api.extension.AfterAllCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeAllCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.springframework.boot.test.util.TestPropertyValues;
|
||||
import org.springframework.test.context.ContextConfigurationAttributes;
|
||||
import org.springframework.test.context.ContextCustomizer;
|
||||
import org.springframework.test.context.ContextCustomizerFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RiakTestcontainerExtension implements BeforeAllCallback, AfterAllCallback {
|
||||
|
||||
private static final ThreadLocal<GenericContainer<?>> THREAD_CONTAINER = new ThreadLocal<>();
|
||||
|
||||
@Override
|
||||
public void beforeAll(ExtensionContext extensionContext) throws Exception {
|
||||
GenericContainer<?> container = createRiakTestcontainer();
|
||||
GenericContainerUtil.startContainer(container);
|
||||
THREAD_CONTAINER.set(container);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterAll(ExtensionContext extensionContext) throws Exception {
|
||||
var container = THREAD_CONTAINER.get();
|
||||
if (container != null && container.isRunning()) {
|
||||
container.stop();
|
||||
}
|
||||
THREAD_CONTAINER.remove();
|
||||
}
|
||||
|
||||
private GenericContainer<?> createRiakTestcontainer() {
|
||||
try (GenericContainer<?> container = new GenericContainer<>(
|
||||
DockerImageName
|
||||
.parse("basho/riak-kv"))
|
||||
.withExposedPorts(8098, 8087)
|
||||
.withPrivilegedMode(true)
|
||||
.withNetworkAliases("riak-kv-" + UUID.randomUUID())
|
||||
.withEnv("CLUSTER_NAME", "riakts")
|
||||
.withEnv("WAIT_FOR_ERLANG", "1000")
|
||||
.withLabel("com.basho.riak.cluster.name", "riakts")
|
||||
.waitingFor(new WaitAllStrategy()
|
||||
.withStartupTimeout(Duration.ofMinutes(2)))) {
|
||||
return container;
|
||||
}
|
||||
}
|
||||
|
||||
public static class RiakTestcontainerContextCustomizerFactory implements ContextCustomizerFactory {
|
||||
|
||||
@Override
|
||||
public ContextCustomizer createContextCustomizer(
|
||||
Class<?> testClass,
|
||||
List<ContextConfigurationAttributes> configAttributes) {
|
||||
return (context, mergedConfig) -> {
|
||||
var container = THREAD_CONTAINER.get();
|
||||
if (container != null) {
|
||||
TestPropertyValues.of(
|
||||
"riak.address=" + container.getContainerIpAddress(),
|
||||
"riak.port=" + container.getMappedPort(8087))
|
||||
.applyTo(context);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
@ -1,129 +1,111 @@
|
||||
package com.rbkmoney.wb.list.manager.handler;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rbkmoney.damsel.wb_list.IdInfo;
|
||||
import com.rbkmoney.damsel.wb_list.ListType;
|
||||
import com.rbkmoney.damsel.wb_list.PaymentId;
|
||||
import com.rbkmoney.damsel.wb_list.Result;
|
||||
import com.rbkmoney.wb.list.manager.TestObjectFactory;
|
||||
import com.rbkmoney.wb.list.manager.model.Row;
|
||||
import com.rbkmoney.wb.list.manager.repository.ListRepository;
|
||||
import org.apache.thrift.TException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class WbListServiceHandlerTest {
|
||||
|
||||
public static final String VALUE = "value";
|
||||
public static final String PARTY_ID = "partyId";
|
||||
public static final String SHOP_ID = "shopId";
|
||||
public static final String LIST_NAME = "listName";
|
||||
WbListServiceHandler wbListServiceHandler;
|
||||
private WbListServiceHandler wbListServiceHandler;
|
||||
|
||||
@Mock
|
||||
ListRepository listRepository;
|
||||
private ListRepository listRepository;
|
||||
private com.rbkmoney.damsel.wb_list.Row row;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
wbListServiceHandler = new WbListServiceHandler(listRepository, new ObjectMapper());
|
||||
Row rowValue = new Row();
|
||||
rowValue.setValue(TestObjectFactory.randomString());
|
||||
row = TestObjectFactory.testRow();
|
||||
row.setValue(rowValue.getValue());
|
||||
Mockito.when(listRepository.get(anyString())).thenReturn(Optional.of(rowValue));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isExist() throws TException {
|
||||
com.rbkmoney.damsel.wb_list.Row row = createRow();
|
||||
void isExist() throws TException {
|
||||
boolean exist = wbListServiceHandler.isExist(row);
|
||||
Assert.assertTrue(exist);
|
||||
assertTrue(exist);
|
||||
|
||||
Mockito.when(listRepository.get(anyString())).thenReturn(Optional.empty());
|
||||
exist = wbListServiceHandler.isExist(row);
|
||||
Assert.assertFalse(exist);
|
||||
assertFalse(exist);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getRowInfo() throws TException {
|
||||
com.rbkmoney.damsel.wb_list.Row row = createRow();
|
||||
void getRowInfo() {
|
||||
Result result = wbListServiceHandler.getRowInfo(row);
|
||||
Assert.assertFalse(result == null);
|
||||
assertNotNull(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isAllExist() throws TException {
|
||||
void isAllExist() throws TException {
|
||||
ArrayList<com.rbkmoney.damsel.wb_list.Row> list = new ArrayList<>();
|
||||
boolean exist = wbListServiceHandler.isAllExist(list);
|
||||
Assert.assertTrue(exist);
|
||||
assertTrue(exist);
|
||||
|
||||
list.add(createRow());
|
||||
list.add(createRow());
|
||||
list.add(row);
|
||||
list.add(row);
|
||||
Mockito.when(listRepository.get(anyString())).thenReturn(Optional.of(new Row()));
|
||||
Assert.assertTrue(wbListServiceHandler.isAllExist(list));
|
||||
assertTrue(wbListServiceHandler.isAllExist(list));
|
||||
|
||||
Mockito.when(listRepository.get(anyString()))
|
||||
.thenReturn(Optional.of(new Row()))
|
||||
.thenReturn(Optional.empty());
|
||||
Assert.assertFalse(wbListServiceHandler.isAllExist(list));
|
||||
assertFalse(wbListServiceHandler.isAllExist(list));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isAnyExist() throws TException {
|
||||
void isAnyExist() throws TException {
|
||||
ArrayList<com.rbkmoney.damsel.wb_list.Row> list = new ArrayList<>();
|
||||
boolean exist = wbListServiceHandler.isAnyExist(list);
|
||||
Assert.assertFalse(exist);
|
||||
assertFalse(exist);
|
||||
|
||||
list.add(createRow());
|
||||
list.add(createRow());
|
||||
list.add(row);
|
||||
list.add(row);
|
||||
Mockito.when(listRepository.get(anyString()))
|
||||
.thenReturn(Optional.of(new Row()))
|
||||
.thenReturn(Optional.empty());
|
||||
Assert.assertTrue(wbListServiceHandler.isAnyExist(list));
|
||||
assertTrue(wbListServiceHandler.isAnyExist(list));
|
||||
|
||||
Mockito.when(listRepository.get(anyString()))
|
||||
.thenReturn(Optional.empty())
|
||||
.thenReturn(Optional.empty());
|
||||
Assert.assertFalse(wbListServiceHandler.isAnyExist(list));
|
||||
assertFalse(wbListServiceHandler.isAnyExist(list));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isNoOneExist() throws TException {
|
||||
void isNoOneExist() throws TException {
|
||||
ArrayList<com.rbkmoney.damsel.wb_list.Row> list = new ArrayList<>();
|
||||
boolean exist = wbListServiceHandler.isNotOneExist(list);
|
||||
Assert.assertTrue(exist);
|
||||
assertTrue(exist);
|
||||
|
||||
list.add(createRow());
|
||||
list.add(createRow());
|
||||
list.add(row);
|
||||
list.add(row);
|
||||
Mockito.when(listRepository.get(anyString()))
|
||||
.thenReturn(Optional.empty())
|
||||
.thenReturn(Optional.empty());
|
||||
Assert.assertTrue(wbListServiceHandler.isNotOneExist(list));
|
||||
assertTrue(wbListServiceHandler.isNotOneExist(list));
|
||||
|
||||
Mockito.when(listRepository.get(anyString()))
|
||||
.thenReturn(Optional.empty())
|
||||
.thenReturn(Optional.of(new Row()));
|
||||
Assert.assertFalse(wbListServiceHandler.isNotOneExist(list));
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private com.rbkmoney.damsel.wb_list.Row createRow() {
|
||||
Row value = new Row();
|
||||
value.setValue(VALUE);
|
||||
Mockito.when(listRepository.get(anyString())).thenReturn(Optional.of(value));
|
||||
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
|
||||
row.setId(IdInfo.payment_id(new PaymentId()
|
||||
.setShopId(SHOP_ID)
|
||||
.setPartyId(PARTY_ID)
|
||||
));
|
||||
row.setListType(ListType.black);
|
||||
row.setListName(LIST_NAME);
|
||||
row.setValue(VALUE);
|
||||
return row;
|
||||
assertFalse(wbListServiceHandler.isNotOneExist(list));
|
||||
}
|
||||
}
|
3
src/test/resources/META-INF/spring.factories
Normal file
3
src/test/resources/META-INF/spring.factories
Normal file
@ -0,0 +1,3 @@
|
||||
# Spring Test ContextCustomizerFactories
|
||||
org.springframework.test.context.ContextCustomizerFactory=\
|
||||
com.rbkmoney.wb.list.manager.extension.RiakTestcontainerExtension.RiakTestcontainerContextCustomizerFactory
|
@ -2,10 +2,19 @@
|
||||
<configuration>
|
||||
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
|
||||
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
|
||||
|
||||
<root level="INFO">
|
||||
<root level="info">
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
</root>
|
||||
<logger name="com.rbkmoney.woody" level="ALL"/>
|
||||
<logger name="com.rbkmoney.wb" level="ALL"/>
|
||||
<logger name="com.rbkmoney.wb.list.manager">
|
||||
<level value="info"/>
|
||||
</logger>
|
||||
<logger name="org.apache.kafka">
|
||||
<level value="warn"/>
|
||||
</logger>
|
||||
<logger name="org.springframework.boot.test.context">
|
||||
<level value="warn"/>
|
||||
</logger>
|
||||
<logger name="org.springframework.test.context">
|
||||
<level value="warn"/>
|
||||
</logger>
|
||||
</configuration>
|
||||
|
Loading…
Reference in New Issue
Block a user