Add tests for ogic

This commit is contained in:
k.struzhkin 2019-04-18 17:24:52 +03:00
parent 02dd877943
commit 6269babe7c
10 changed files with 307 additions and 20 deletions

View File

@ -1,16 +1,19 @@
package com.rbkmoney.fraudbusters.management.config;
import com.rbkmoney.damsel.wb_list.ChangeCommand;
import com.rbkmoney.damsel.wb_list.Event;
import com.rbkmoney.fraudbusters.management.serializer.EventDeserializer;
import com.rbkmoney.fraudbusters.management.serializer.ThriftSerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
@ -47,4 +50,18 @@ public class KafkaConfig {
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ProducerFactory<String, ChangeCommand> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, ChangeCommand> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

View File

@ -0,0 +1,21 @@
package com.rbkmoney.fraudbusters.management.converter;
import com.rbkmoney.damsel.wb_list.Row;
import com.rbkmoney.fraudbusters.management.domain.ListRecord;
import org.springframework.core.convert.converter.Converter;
import org.springframework.stereotype.Component;
@Component
public class ListRecordToRowConverter implements Converter<ListRecord, Row> {
@Override
public Row convert(ListRecord listRecord) {
Row row = new Row();
row.setPartyId(listRecord.getPartyId());
row.setShopId(listRecord.getShopId());
row.setListName(listRecord.getListName());
row.setValue(listRecord.getValue());
return row;
}
}

View File

@ -0,0 +1,12 @@
package com.rbkmoney.fraudbusters.management.converter;
import com.rbkmoney.fraudbusters.management.domain.ListRecord;
import com.rbkmoney.fraudbusters.management.domain.tables.pojos.WbListRecords;
import org.mapstruct.Mapper;
@Mapper(componentModel = "spring")
public interface WbListRecordsToListRecordConverter {
ListRecord destinationToSource(WbListRecords destination);
}

View File

@ -0,0 +1,13 @@
package com.rbkmoney.fraudbusters.management.domain;
import lombok.Data;
@Data
public class ListRecord {
private String partyId;
private String shopId;
private String listName;
private String value;
}

View File

@ -1,15 +1,117 @@
package com.rbkmoney.fraudbusters.management.resource;
import com.rbkmoney.damsel.wb_list.ChangeCommand;
import com.rbkmoney.damsel.wb_list.Command;
import com.rbkmoney.damsel.wb_list.ListType;
import com.rbkmoney.damsel.wb_list.Row;
import com.rbkmoney.fraudbusters.management.converter.ListRecordToRowConverter;
import com.rbkmoney.fraudbusters.management.converter.WbListRecordsToListRecordConverter;
import com.rbkmoney.fraudbusters.management.dao.wblist.WbListDao;
import com.rbkmoney.fraudbusters.management.domain.ListRecord;
import com.rbkmoney.fraudbusters.management.domain.tables.pojos.WbListRecords;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Controller
@RequiredArgsConstructor
public class WbListResource {
private final ListRecordToRowConverter listRecordToRowConverter;
private final KafkaTemplate kafkaTemplate;
private final WbListDao wbListDao;
private final WbListRecordsToListRecordConverter wbListRecordsToListRecordConverter;
@Value("${kafka.wblist.topic.command}")
public String topicCommand;
@PostMapping(value = "/whiteList")
public ResponseEntity<String> insertRowToWhite(@RequestBody ListRecord record) {
Row row = listRecordToRowConverter.convert(record);
log.info("WbListResource whiteList add record {}", record);
return sendCommand(row, ListType.white, Command.CREATE);
}
@DeleteMapping(value = "/whiteList")
public ResponseEntity<String> removeRowFromWhiteList(@RequestBody ListRecord record) {
Row row = listRecordToRowConverter.convert(record);
log.info("WbListResource whiteList add record {}", record);
return sendCommand(row, ListType.white, Command.DELETE);
}
@GetMapping(value = "/whiteList")
public ResponseEntity<List<ListRecord>> getWhiteList(@RequestParam String partyId,
@RequestParam String shopId,
@RequestParam String listName) {
try {
List<WbListRecords> filteredListRecords = wbListDao.getFilteredListRecords(partyId, shopId,
com.rbkmoney.fraudbusters.management.domain.enums.ListType.white, listName);
List<ListRecord> listRecords = filteredListRecords.stream()
.map(wbListRecordsToListRecordConverter::destinationToSource)
.collect(Collectors.toList());
return ResponseEntity.ok().body(listRecords);
} catch (Exception e) {
log.error("Unexpected error when build payment request", e);
return ResponseEntity.badRequest().body(new ArrayList<>());
}
}
@PostMapping(value = "/blackList")
public ResponseEntity<String> insertRowToBlack(@RequestBody ListRecord record) {
Row row = listRecordToRowConverter.convert(record);
log.info("WbListResource whiteList add record {}", record);
return sendCommand(row, ListType.black, Command.CREATE);
}
@DeleteMapping(value = "/blackList")
public ResponseEntity<String> removeRowFromBlackList(@RequestBody ListRecord record) {
Row row = listRecordToRowConverter.convert(record);
log.info("WbListResource whiteList add record {}", record);
return sendCommand(row, ListType.black, Command.DELETE);
}
@GetMapping(value = "/blackList")
public ResponseEntity<List<ListRecord>> getBlackList(@RequestParam String partyId,
@RequestParam String shopId,
@RequestParam String listName) {
try {
List<WbListRecords> filteredListRecords = wbListDao.getFilteredListRecords(partyId, shopId,
com.rbkmoney.fraudbusters.management.domain.enums.ListType.black, listName);
List<ListRecord> listRecords = filteredListRecords.stream()
.map(wbListRecordsToListRecordConverter::destinationToSource)
.collect(Collectors.toList());
return ResponseEntity.ok().body(listRecords);
} catch (Exception e) {
log.error("Unexpected error when build payment request", e);
return ResponseEntity.badRequest().body(new ArrayList<>());
}
}
private ResponseEntity<String> sendCommand(Row row, ListType white, Command command) {
try {
row.setListType(white);
kafkaTemplate.send(topicCommand, createChangeCommand(row, command));
return ResponseEntity.ok().body("");
} catch (Exception e) {
log.error("Unexpected error when build payment request", e);
return ResponseEntity.badRequest().body(e.getMessage());
}
}
private ChangeCommand createChangeCommand(Row row, Command command) {
ChangeCommand changeCommand = new ChangeCommand();
changeCommand.setRow(row);
changeCommand.setCommand(command);
return changeCommand;
}
}

View File

@ -0,0 +1,36 @@
package com.rbkmoney.fraudbusters.management.serializer;
import com.rbkmoney.damsel.wb_list.ChangeCommand;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.thrift.TDeserializer;
import java.util.Map;
@Slf4j
public class CommandDeserializer implements Deserializer<ChangeCommand> {
private final ThreadLocal<TDeserializer> thriftDeserializer = ThreadLocal.withInitial(TDeserializer::new);
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public ChangeCommand deserialize(String topic, byte[] data) {
ChangeCommand command = new ChangeCommand();
try {
thriftDeserializer.get().deserialize(command, data);
} catch (Exception e) {
log.error("Error when deserialize command data: {} ", data, e);
}
return command;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,35 @@
package com.rbkmoney.fraudbusters.management.serializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.Map;
@Slf4j
public class ThriftSerializer<T extends TBase> implements Serializer<T> {
private final ThreadLocal<TSerializer> thriftSerializer = ThreadLocal.withInitial(TSerializer::new);
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String s, T event) {
try {
return thriftSerializer.get().serialize(event);
} catch (TException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,7 @@
package com.rbkmoney.fraudbusters.management.service;
import org.springframework.stereotype.Component;
@Component
public class CommandService {
}

View File

@ -1,44 +1,89 @@
package com.rbkmoney.fraudbusters.management;
import org.flywaydb.core.Flyway;
import com.rbkmoney.fraudbusters.management.config.KafkaConfig;
import com.rbkmoney.fraudbusters.management.serializer.EventDeserializer;
import com.rbkmoney.fraudbusters.management.serializer.ThriftSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.KafkaContainer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
@Slf4j
@RunWith(SpringRunner.class)
@EnableConfigurationProperties({DataSourceProperties.class})
@ContextConfiguration(classes = {DataSourceAutoConfiguration.class},
@ContextConfiguration(classes = {KafkaConfig.class},
initializers = AbstractKafkaIntegrationTest.Initializer.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
public abstract class AbstractKafkaIntegrationTest {
public static final String KAFKA_DOCKER_VERSION = "5.0.1";
@ClassRule
public static PostgreSQLContainer postgres = (PostgreSQLContainer) new PostgreSQLContainer("postgres:9.6")
.withStartupTimeout(Duration.ofMinutes(5));
public static KafkaContainer kafka = new KafkaContainer(KAFKA_DOCKER_VERSION).withEmbeddedZookeeper();
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
public static final String WB_LIST_EVENT_SINK = "wb-list-event-sink";
public static final String WB_LIST_COMMAND = "wb-list-command";
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
TestPropertyValues.of(
"spring.datasource.url=" + postgres.getJdbcUrl(),
"spring.datasource.username=" + postgres.getUsername(),
"spring.datasource.password=" + postgres.getPassword(),
"flyway.url=" + postgres.getJdbcUrl(),
"flyway.user=" + postgres.getUsername(),
"flyway.password=" + postgres.getPassword()
).applyTo(configurableApplicationContext);
TestPropertyValues
.of(
"kafka.bootstrap.servers=" + kafka.getBootstrapServers())
.applyTo(configurableApplicationContext.getEnvironment());
initTopic(WB_LIST_COMMAND);
initTopic(WB_LIST_EVENT_SINK);
}
@NotNull
private <T> Consumer<String, T> initTopic(String topicName) {
Consumer<String, T> consumer = createConsumer(EventDeserializer.class);
try {
consumer.subscribe(Collections.singletonList(topicName));
consumer.poll(Duration.ofMillis(100L));
} catch (Exception e) {
log.error("KafkaAbstractTest initialize e: ", e);
}
consumer.close();
return consumer;
}
}
public static <T> Consumer<String, T> createConsumer(Class clazz) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
public static <T> Producer<String, T> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class);
return new KafkaProducer<>(props);
}
}

View File

@ -14,7 +14,6 @@ import java.time.LocalDateTime;
import java.util.List;
@ContextConfiguration(classes = {WbListDaoImpl.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
public class WbListDaoImplTest extends AbstractPostgresIntegrationTest {
public static final String PARTY = "party";