diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/config/KafkaConfig.java b/src/main/java/com/rbkmoney/fraudbusters/management/config/KafkaConfig.java index 828ac93..cb1a62b 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/management/config/KafkaConfig.java +++ b/src/main/java/com/rbkmoney/fraudbusters/management/config/KafkaConfig.java @@ -1,16 +1,19 @@ package com.rbkmoney.fraudbusters.management.config; +import com.rbkmoney.damsel.wb_list.ChangeCommand; import com.rbkmoney.damsel.wb_list.Event; import com.rbkmoney.fraudbusters.management.serializer.EventDeserializer; +import com.rbkmoney.fraudbusters.management.serializer.ThriftSerializer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; @@ -47,4 +50,18 @@ public class KafkaConfig { factory.setConsumerFactory(consumerFactory()); return factory; } + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } } diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/converter/ListRecordToRowConverter.java b/src/main/java/com/rbkmoney/fraudbusters/management/converter/ListRecordToRowConverter.java new file mode 100644 index 0000000..9d145c5 --- /dev/null +++ b/src/main/java/com/rbkmoney/fraudbusters/management/converter/ListRecordToRowConverter.java @@ -0,0 +1,21 @@ +package com.rbkmoney.fraudbusters.management.converter; + +import com.rbkmoney.damsel.wb_list.Row; +import com.rbkmoney.fraudbusters.management.domain.ListRecord; +import org.springframework.core.convert.converter.Converter; +import org.springframework.stereotype.Component; + + +@Component +public class ListRecordToRowConverter implements Converter { + + @Override + public Row convert(ListRecord listRecord) { + Row row = new Row(); + row.setPartyId(listRecord.getPartyId()); + row.setShopId(listRecord.getShopId()); + row.setListName(listRecord.getListName()); + row.setValue(listRecord.getValue()); + return row; + } +} diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/converter/WbListRecordsToListRecordConverter.java b/src/main/java/com/rbkmoney/fraudbusters/management/converter/WbListRecordsToListRecordConverter.java new file mode 100644 index 0000000..6725139 --- /dev/null +++ b/src/main/java/com/rbkmoney/fraudbusters/management/converter/WbListRecordsToListRecordConverter.java @@ -0,0 +1,12 @@ +package com.rbkmoney.fraudbusters.management.converter; + + +import com.rbkmoney.fraudbusters.management.domain.ListRecord; +import com.rbkmoney.fraudbusters.management.domain.tables.pojos.WbListRecords; +import org.mapstruct.Mapper; + +@Mapper(componentModel = "spring") +public interface WbListRecordsToListRecordConverter { + + ListRecord destinationToSource(WbListRecords destination); +} diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/domain/ListRecord.java b/src/main/java/com/rbkmoney/fraudbusters/management/domain/ListRecord.java new file mode 100644 index 0000000..3ba92d2 --- /dev/null +++ b/src/main/java/com/rbkmoney/fraudbusters/management/domain/ListRecord.java @@ -0,0 +1,13 @@ +package com.rbkmoney.fraudbusters.management.domain; + +import lombok.Data; + +@Data +public class ListRecord { + + private String partyId; + private String shopId; + private String listName; + private String value; + +} diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/resource/WbListResource.java b/src/main/java/com/rbkmoney/fraudbusters/management/resource/WbListResource.java index d724b8b..ecfa239 100644 --- a/src/main/java/com/rbkmoney/fraudbusters/management/resource/WbListResource.java +++ b/src/main/java/com/rbkmoney/fraudbusters/management/resource/WbListResource.java @@ -1,15 +1,117 @@ package com.rbkmoney.fraudbusters.management.resource; +import com.rbkmoney.damsel.wb_list.ChangeCommand; +import com.rbkmoney.damsel.wb_list.Command; +import com.rbkmoney.damsel.wb_list.ListType; +import com.rbkmoney.damsel.wb_list.Row; +import com.rbkmoney.fraudbusters.management.converter.ListRecordToRowConverter; +import com.rbkmoney.fraudbusters.management.converter.WbListRecordsToListRecordConverter; import com.rbkmoney.fraudbusters.management.dao.wblist.WbListDao; +import com.rbkmoney.fraudbusters.management.domain.ListRecord; +import com.rbkmoney.fraudbusters.management.domain.tables.pojos.WbListRecords; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; @Slf4j @Controller @RequiredArgsConstructor public class WbListResource { + private final ListRecordToRowConverter listRecordToRowConverter; + private final KafkaTemplate kafkaTemplate; private final WbListDao wbListDao; + private final WbListRecordsToListRecordConverter wbListRecordsToListRecordConverter; + + @Value("${kafka.wblist.topic.command}") + public String topicCommand; + + @PostMapping(value = "/whiteList") + public ResponseEntity insertRowToWhite(@RequestBody ListRecord record) { + Row row = listRecordToRowConverter.convert(record); + log.info("WbListResource whiteList add record {}", record); + return sendCommand(row, ListType.white, Command.CREATE); + } + + @DeleteMapping(value = "/whiteList") + public ResponseEntity removeRowFromWhiteList(@RequestBody ListRecord record) { + Row row = listRecordToRowConverter.convert(record); + log.info("WbListResource whiteList add record {}", record); + return sendCommand(row, ListType.white, Command.DELETE); + } + + @GetMapping(value = "/whiteList") + public ResponseEntity> getWhiteList(@RequestParam String partyId, + @RequestParam String shopId, + @RequestParam String listName) { + try { + List filteredListRecords = wbListDao.getFilteredListRecords(partyId, shopId, + com.rbkmoney.fraudbusters.management.domain.enums.ListType.white, listName); + List listRecords = filteredListRecords.stream() + .map(wbListRecordsToListRecordConverter::destinationToSource) + .collect(Collectors.toList()); + return ResponseEntity.ok().body(listRecords); + } catch (Exception e) { + log.error("Unexpected error when build payment request", e); + return ResponseEntity.badRequest().body(new ArrayList<>()); + } + } + + @PostMapping(value = "/blackList") + public ResponseEntity insertRowToBlack(@RequestBody ListRecord record) { + Row row = listRecordToRowConverter.convert(record); + log.info("WbListResource whiteList add record {}", record); + return sendCommand(row, ListType.black, Command.CREATE); + } + + @DeleteMapping(value = "/blackList") + public ResponseEntity removeRowFromBlackList(@RequestBody ListRecord record) { + Row row = listRecordToRowConverter.convert(record); + log.info("WbListResource whiteList add record {}", record); + return sendCommand(row, ListType.black, Command.DELETE); + } + + @GetMapping(value = "/blackList") + public ResponseEntity> getBlackList(@RequestParam String partyId, + @RequestParam String shopId, + @RequestParam String listName) { + try { + List filteredListRecords = wbListDao.getFilteredListRecords(partyId, shopId, + com.rbkmoney.fraudbusters.management.domain.enums.ListType.black, listName); + List listRecords = filteredListRecords.stream() + .map(wbListRecordsToListRecordConverter::destinationToSource) + .collect(Collectors.toList()); + return ResponseEntity.ok().body(listRecords); + } catch (Exception e) { + log.error("Unexpected error when build payment request", e); + return ResponseEntity.badRequest().body(new ArrayList<>()); + } + } + + private ResponseEntity sendCommand(Row row, ListType white, Command command) { + try { + row.setListType(white); + kafkaTemplate.send(topicCommand, createChangeCommand(row, command)); + return ResponseEntity.ok().body(""); + } catch (Exception e) { + log.error("Unexpected error when build payment request", e); + return ResponseEntity.badRequest().body(e.getMessage()); + } + } + + private ChangeCommand createChangeCommand(Row row, Command command) { + ChangeCommand changeCommand = new ChangeCommand(); + changeCommand.setRow(row); + changeCommand.setCommand(command); + return changeCommand; + } } diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/serializer/CommandDeserializer.java b/src/main/java/com/rbkmoney/fraudbusters/management/serializer/CommandDeserializer.java new file mode 100644 index 0000000..a50987b --- /dev/null +++ b/src/main/java/com/rbkmoney/fraudbusters/management/serializer/CommandDeserializer.java @@ -0,0 +1,36 @@ +package com.rbkmoney.fraudbusters.management.serializer; + + +import com.rbkmoney.damsel.wb_list.ChangeCommand; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.thrift.TDeserializer; + +import java.util.Map; + +@Slf4j +public class CommandDeserializer implements Deserializer { + + private final ThreadLocal thriftDeserializer = ThreadLocal.withInitial(TDeserializer::new); + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public ChangeCommand deserialize(String topic, byte[] data) { + ChangeCommand command = new ChangeCommand(); + try { + thriftDeserializer.get().deserialize(command, data); + } catch (Exception e) { + log.error("Error when deserialize command data: {} ", data, e); + } + return command; + } + + @Override + public void close() { + + } +} \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/serializer/ThriftSerializer.java b/src/main/java/com/rbkmoney/fraudbusters/management/serializer/ThriftSerializer.java new file mode 100644 index 0000000..843705a --- /dev/null +++ b/src/main/java/com/rbkmoney/fraudbusters/management/serializer/ThriftSerializer.java @@ -0,0 +1,35 @@ +package com.rbkmoney.fraudbusters.management.serializer; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.util.Map; + +@Slf4j +public class ThriftSerializer implements Serializer { + + private final ThreadLocal thriftSerializer = ThreadLocal.withInitial(TSerializer::new); + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public byte[] serialize(String s, T event) { + try { + return thriftSerializer.get().serialize(event); + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + + } +} \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/fraudbusters/management/service/CommandService.java b/src/main/java/com/rbkmoney/fraudbusters/management/service/CommandService.java new file mode 100644 index 0000000..b56964c --- /dev/null +++ b/src/main/java/com/rbkmoney/fraudbusters/management/service/CommandService.java @@ -0,0 +1,7 @@ +package com.rbkmoney.fraudbusters.management.service; + +import org.springframework.stereotype.Component; + +@Component +public class CommandService { +} diff --git a/src/test/java/com/rbkmoney/fraudbusters/management/AbstractKafkaIntegrationTest.java b/src/test/java/com/rbkmoney/fraudbusters/management/AbstractKafkaIntegrationTest.java index e42db50..0a434fb 100644 --- a/src/test/java/com/rbkmoney/fraudbusters/management/AbstractKafkaIntegrationTest.java +++ b/src/test/java/com/rbkmoney/fraudbusters/management/AbstractKafkaIntegrationTest.java @@ -1,44 +1,89 @@ package com.rbkmoney.fraudbusters.management; -import org.flywaydb.core.Flyway; +import com.rbkmoney.fraudbusters.management.config.KafkaConfig; +import com.rbkmoney.fraudbusters.management.serializer.EventDeserializer; +import com.rbkmoney.fraudbusters.management.serializer.ThriftSerializer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.jetbrains.annotations.NotNull; import org.junit.ClassRule; import org.junit.runner.RunWith; -import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; -import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; -import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.KafkaContainer; import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +@Slf4j @RunWith(SpringRunner.class) -@EnableConfigurationProperties({DataSourceProperties.class}) -@ContextConfiguration(classes = {DataSourceAutoConfiguration.class}, +@ContextConfiguration(classes = {KafkaConfig.class}, initializers = AbstractKafkaIntegrationTest.Initializer.class) @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) public abstract class AbstractKafkaIntegrationTest { + public static final String KAFKA_DOCKER_VERSION = "5.0.1"; + @ClassRule - public static PostgreSQLContainer postgres = (PostgreSQLContainer) new PostgreSQLContainer("postgres:9.6") - .withStartupTimeout(Duration.ofMinutes(5)); + public static KafkaContainer kafka = new KafkaContainer(KAFKA_DOCKER_VERSION).withEmbeddedZookeeper(); public static class Initializer implements ApplicationContextInitializer { + public static final String WB_LIST_EVENT_SINK = "wb-list-event-sink"; + public static final String WB_LIST_COMMAND = "wb-list-command"; + @Override public void initialize(ConfigurableApplicationContext configurableApplicationContext) { - TestPropertyValues.of( - "spring.datasource.url=" + postgres.getJdbcUrl(), - "spring.datasource.username=" + postgres.getUsername(), - "spring.datasource.password=" + postgres.getPassword(), - "flyway.url=" + postgres.getJdbcUrl(), - "flyway.user=" + postgres.getUsername(), - "flyway.password=" + postgres.getPassword() - ).applyTo(configurableApplicationContext); + TestPropertyValues + .of( + "kafka.bootstrap.servers=" + kafka.getBootstrapServers()) + .applyTo(configurableApplicationContext.getEnvironment()); + initTopic(WB_LIST_COMMAND); + initTopic(WB_LIST_EVENT_SINK); + } + + @NotNull + private Consumer initTopic(String topicName) { + Consumer consumer = createConsumer(EventDeserializer.class); + try { + consumer.subscribe(Collections.singletonList(topicName)); + consumer.poll(Duration.ofMillis(100L)); + } catch (Exception e) { + log.error("KafkaAbstractTest initialize e: ", e); + } + consumer.close(); + return consumer; } } + public static Consumer createConsumer(Class clazz) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + + public static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "CLIENT"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class); + return new KafkaProducer<>(props); + } + } \ No newline at end of file diff --git a/src/test/java/com/rbkmoney/fraudbusters/management/dao/wblist/WbListDaoImplTest.java b/src/test/java/com/rbkmoney/fraudbusters/management/dao/wblist/WbListDaoImplTest.java index 477b699..ce5a1c6 100644 --- a/src/test/java/com/rbkmoney/fraudbusters/management/dao/wblist/WbListDaoImplTest.java +++ b/src/test/java/com/rbkmoney/fraudbusters/management/dao/wblist/WbListDaoImplTest.java @@ -14,7 +14,6 @@ import java.time.LocalDateTime; import java.util.List; @ContextConfiguration(classes = {WbListDaoImpl.class}) -@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) public class WbListDaoImplTest extends AbstractPostgresIntegrationTest { public static final String PARTY = "party";