Refactor (#23)

* Add identifier for rule

* Fix after review

* Fix current amount and sum

* Add logic for new proto scheme

* Fix for new interface wb-list

* Add retry template

* Refactor to common lib
This commit is contained in:
Kostya 2019-07-03 15:08:33 +03:00 committed by GitHub
parent f4ad7d47a0
commit 73e58504f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 362 additions and 57 deletions

42
pom.xml
View File

@ -30,6 +30,7 @@
<damsel-utils.version>2.1.3</damsel-utils.version>
<wb.list.proto.version>1.20-2c2fb6a</wb.list.proto.version>
<sonar.jacoco.reportPath>${project.basedir}/../target/jacoco.exec</sonar.jacoco.reportPath>
<kafka.common.lib.version>0.0.9</kafka.common.lib.version>
</properties>
<dependencies>
<dependency>
@ -57,11 +58,6 @@
<artifactId>logstash-logback-encoder</artifactId>
<version>5.0</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.logback</groupId>
<artifactId>nop-rolling</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -74,11 +70,6 @@
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>damsel-utils</artifactId>
<version>${damsel-utils.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
@ -105,6 +96,17 @@
</exclusions>
</dependency>
<!--rbk libs-->
<dependency>
<groupId>com.rbkmoney.logback</groupId>
<artifactId>nop-rolling</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>damsel-utils</artifactId>
<version>${damsel-utils.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>fraudo</artifactId>
@ -130,6 +132,16 @@
<artifactId>woody-thrift</artifactId>
<version>${woody.thrift.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>kafka-common-lib</artifactId>
<version>${kafka.common.lib.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>fraudbusters-proto</artifactId>
<version>1.10-ebbe9be</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@ -141,16 +153,6 @@
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.48</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>fraudbusters-proto</artifactId>
<version>1.10-ebbe9be</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>kafka-common-lib</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- Test libs -->
<dependency>

View File

@ -6,8 +6,10 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
import javax.sql.DataSource;
import java.util.Properties;
@Configuration
public class ClickhouseConfig {
@ -15,9 +17,18 @@ public class ClickhouseConfig {
@Value("${clickhouse.db.url}")
private String dbUrl;
@Value("${clickhouse.db.user}")
private String user;
@Value("${clickhouse.db.password}")
private String password;
@Bean
public ClickHouseDataSource clickHouseDataSource() {
return new ClickHouseDataSource(dbUrl);
Properties info = new Properties();
info.setProperty(ClickHouseQueryParam.USER.getKey(), user);
info.setProperty(ClickHouseQueryParam.PASSWORD.getKey(), password);
return new ClickHouseDataSource(dbUrl, info);
}
@Bean
@ -25,4 +36,5 @@ public class ClickhouseConfig {
public JdbcTemplate jdbcTemplate(DataSource clickHouseDataSource) {
return new JdbcTemplate(clickHouseDataSource);
}
}

View File

@ -36,4 +36,5 @@ public class ExternalServiceConfig {
throw new RuntimeException(e);
}
}
}

View File

@ -6,7 +6,9 @@ import com.rbkmoney.fraudbusters.serde.CommandDeserializer;
import com.rbkmoney.fraudbusters.serde.FraudRequestSerde;
import com.rbkmoney.fraudbusters.serde.FraudoResultDeserializer;
import com.rbkmoney.fraudbusters.util.KeyGenerator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsConfig;
@ -17,7 +19,14 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.LoggingErrorHandler;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -29,21 +38,52 @@ public class KafkaConfig {
private static final String REFERENCE_GROUP_ID = "ReferenceListener-";
private static final String EARLIEST = "earliest";
private static final String RESULT_AGGREGATOR = "ResultAggregator";
private static final String MAX_POLL_RECORDS_CONFIG = "20";
private static final String PKCS_12 = "PKCS12";
private static final String SSL = "SSL";
private static final String FRAUD_BUSTERS = "fraud-busters";
private static final String FRAUD_BUSTERS_CLIENT = "fraud-busters-client";
@Value("${kafka.max.poll.records}")
private String maxPollRecords;
@Value("${kafka.max.retry.attempts}")
private int maxRetryAttempts;
@Value("${kafka.backoff.interval}")
private int backoffInterval;
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.ssl.server-password}")
private String serverStorePassword;
@Value("${kafka.ssl.server-keystore-location}")
private String serverStoreCertPath;
@Value("${kafka.ssl.keystore-password}")
private String keyStorePassword;
@Value("${kafka.ssl.key-password}")
private String keyPassword;
@Value("${kafka.ssl.keystore-location}")
private String clientStoreCertPath;
@Value("${kafka.ssl.enable}")
private boolean kafkaSslEnable;
@Bean
public Properties fraudStreamProperties() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-busters");
props.put(StreamsConfig.CLIENT_ID_CONFIG, "fraud-busters-client");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, FRAUD_BUSTERS);
props.put(StreamsConfig.CLIENT_ID_CONFIG, FRAUD_BUSTERS_CLIENT);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, FraudRequestSerde.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putAll(sslConfigure());
return props;
}
@ -68,6 +108,7 @@ public class KafkaConfig {
props.put(ConsumerConfig.GROUP_ID_CONFIG, value);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.putAll(sslConfigure());
return props;
}
@ -75,6 +116,9 @@ public class KafkaConfig {
public ConcurrentKafkaListenerContainerFactory<String, Command> templateListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Command> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(templateListenerFactory());
factory.setConcurrency(1);
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(new LoggingErrorHandler());
return factory;
}
@ -82,17 +126,57 @@ public class KafkaConfig {
public ConcurrentKafkaListenerContainerFactory<String, Command> referenceListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Command> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(referenceListenerFactory());
factory.setConcurrency(1);
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(new LoggingErrorHandler());
return factory;
}
/*
* Retry template.
*/
private RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts(maxRetryAttempts);
return policy;
}
private BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(backoffInterval);
return policy;
}
private RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FraudResult> resultListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FraudResult> factory = new ConcurrentKafkaListenerContainerFactory<>();
final Map<String, Object> props = createDefaultProperties(RESULT_AGGREGATOR);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS_CONFIG);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
DefaultKafkaConsumerFactory<String, FraudResult> consumerFactory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(), new FraudoResultDeserializer());
factory.setConsumerFactory(consumerFactory);
return factory;
}
private Map<String, Object> sslConfigure() {
Map<String, Object> configProps = new HashMap<>();
if (kafkaSslEnable) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL);
configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(serverStoreCertPath).getAbsolutePath());
configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, serverStorePassword);
configProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, PKCS_12);
configProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PKCS_12);
configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(clientStoreCertPath).getAbsolutePath());
configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
configProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
}
return configProps;
}
}

View File

@ -30,7 +30,7 @@ public class ReplyTemplateConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.result.stream.topic}")
@Value("${kafka.topic.result}")
private String replyTopic;
@Bean

View File

@ -15,10 +15,10 @@ import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@Configuration
public class ResourceConfig {
@Value("${kafka.global.stream.topic}")
@Value("${kafka.topic.global}")
private String requestTopic;
@Value("${kafka.result.stream.topic}")
@Value("${kafka.topic.result}")
private String requestReplyTopic;
@Bean

View File

@ -0,0 +1,23 @@
package com.rbkmoney.fraudbusters.exception;
public class StreamInitializationException extends RuntimeException {
public StreamInitializationException() {
}
public StreamInitializationException(String message) {
super(message);
}
public StreamInitializationException(String message, Throwable cause) {
super(message, cause);
}
public StreamInitializationException(Throwable cause) {
super(cause);
}
public StreamInitializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -18,7 +18,7 @@ public class ResultAggregatorListener {
private final EventRepository eventRepository;
private final FraudResultToEventConverter fraudResultToEventConverter;
@KafkaListener(topics = "${kafka.result.stream.topic}", containerFactory = "resultListenerContainerFactory")
@KafkaListener(topics = "${kafka.topic.result}", containerFactory = "resultListenerContainerFactory")
public void listen(List<FraudResult> batch) {
log.info("ResultAggregatorListener listen result: {}", batch);
eventRepository.insertBatch(fraudResultToEventConverter.convertBatch(batch));

View File

@ -1,11 +1,17 @@
package com.rbkmoney.fraudbusters.listener;
import com.rbkmoney.damsel.fraudbusters.Command;
import com.rbkmoney.fraudbusters.stream.TemplateStreamFactoryImpl;
import com.rbkmoney.kafka.common.loader.PreloadListener;
import com.rbkmoney.kafka.common.loader.PreloadListenerImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;
import java.time.Duration;
@ -16,19 +22,50 @@ import java.util.Properties;
@RequiredArgsConstructor
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {
public static final long PRELOAD_TIMEOUT = 30000L;
private final TemplateStreamFactoryImpl templateStreamFactoryImpl;
private final Properties fraudStreamProperties;
private final ConsumerFactory<String, Command> templateListenerFactory;
private final ConsumerFactory<String, Command> referenceListenerFactory;
private final TemplateListener templateListener;
private final TemplateReferenceListener templateReferenceListener;
private KafkaStreams kafkaStreams;
private PreloadListener<String, Command> preloadListener = new PreloadListenerImpl<>();
@Value("${kafka.topic.template}")
private String topic;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
kafkaStreams = templateStreamFactoryImpl.create(fraudStreamProperties);
log.info("StartupListener start stream kafkaStreams: ", kafkaStreams.allMetadata());
try {
Thread threadTemplate = new Thread(this::waitForLoadTemplates);
threadTemplate.start();
Thread threadReference = new Thread(this::waitForLoadReferences);
threadReference.start();
threadTemplate.join(PRELOAD_TIMEOUT);
threadReference.join(PRELOAD_TIMEOUT);
kafkaStreams = templateStreamFactoryImpl.create(fraudStreamProperties);
log.info("StartupListener start stream kafkaStreams: ", kafkaStreams.allMetadata());
} catch (InterruptedException e) {
log.error("StartupListener onApplicationEvent e: ", e);
Thread.currentThread().interrupt();
}
}
public void stop() {
kafkaStreams.close(Duration.ofSeconds(10L));
}
private void waitForLoadTemplates() {
Consumer<String, Command> consumer = templateListenerFactory.createConsumer();
preloadListener.preloadToLastOffsetInPartition(consumer, topic, 0, templateListener::listen);
consumer.close();
}
private void waitForLoadReferences() {
Consumer<String, Command> consumer = referenceListenerFactory.createConsumer();
preloadListener.preloadToLastOffsetInPartition(consumer, topic, 0, templateReferenceListener::listen);
consumer.close();
}
}

View File

@ -8,7 +8,7 @@ import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import java.io.IOException;
@WebServlet("/v1/fraud_inspector")
@WebServlet("/fraud_inspector/v1")
@RequiredArgsConstructor
public class FraudInspectorServlet extends GenericServlet {

View File

@ -2,14 +2,15 @@ package com.rbkmoney.fraudbusters.serde;
import com.rbkmoney.damsel.fraudbusters.Command;
import com.rbkmoney.deserializer.AbstractDeserializerAdapter;
import com.rbkmoney.kafka.common.serialization.AbstractThriftDeserializer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CommandDeserializer extends AbstractDeserializerAdapter<Command> {
public class CommandDeserializer extends AbstractThriftDeserializer<Command> {
@Override
public Command deserialize(String topic, byte[] data) {
return deserialize(data, new Command());
}
}

View File

@ -1,6 +1,7 @@
package com.rbkmoney.fraudbusters.stream;
import com.rbkmoney.fraudbusters.domain.FraudResult;
import com.rbkmoney.fraudbusters.exception.StreamInitializationException;
import com.rbkmoney.fraudbusters.serde.FraudRequestSerde;
import com.rbkmoney.fraudbusters.serde.FraudoResultSerde;
import lombok.RequiredArgsConstructor;
@ -20,13 +21,15 @@ import java.util.Properties;
@RequiredArgsConstructor
public class TemplateStreamFactoryImpl implements TemplateStreamFactory {
@Value("${kafka.global.stream.topic}")
@Value("${kafka.topic.global}")
private String readTopic;
@Value("${kafka.result.stream.topic}")
@Value("${kafka.topic.result}")
private String resultTopic;
private final FraudRequestSerde fraudRequestSerde = new FraudRequestSerde();
private final TemplateVisitorImpl templateVisitor;
@Override
public KafkaStreams create(final Properties streamsConfiguration) {
try {
@ -41,7 +44,7 @@ public class TemplateStreamFactoryImpl implements TemplateStreamFactory {
return kafkaStreams;
} catch (Exception e) {
log.error("Error when GlobalStreamFactory insert e: ", e);
throw new RuntimeException(e);
throw new StreamInitializationException(e);
}
}

View File

@ -56,7 +56,6 @@ public class TemplateVisitorImpl implements TemplateVisitor {
return checkedResultModel;
}
private Optional<CheckedResultModel> apply(FraudModel fraudModel, String templateKey) {
FraudoParser.ParseContext parseContext = templatePool.get(templateKey);
if (parseContext != null) {

View File

@ -7,7 +7,7 @@ import org.testcontainers.shaded.io.netty.util.internal.StringUtil;
public class ReferenceKeyGenerator {
public static final String SEPARATOR = "_";
private static final String SEPARATOR = "_";
public static String generateTemplateKey(TemplateReference reference) {
if (reference.is_global) {
@ -17,11 +17,9 @@ public class ReferenceKeyGenerator {
}
public static String generateTemplateKey(String partyId, String shopId) {
if (StringUtil.isNullOrEmpty(shopId)
&& !StringUtil.isNullOrEmpty(partyId)) {
if (StringUtil.isNullOrEmpty(shopId) && !StringUtil.isNullOrEmpty(partyId)) {
return partyId;
} else if (!StringUtil.isNullOrEmpty(shopId)
&& !StringUtil.isNullOrEmpty(partyId)) {
} else if (!StringUtil.isNullOrEmpty(shopId) && !StringUtil.isNullOrEmpty(partyId)) {
return partyId + SEPARATOR + shopId;
}
throw new UnknownReferenceException();

View File

@ -1,6 +1,18 @@
server.port: @server.port@
management:
security:
flag: false
metrics:
export:
statsd:
flavor: etsy
enabled: false
spring:
application:
name: @project.name@
output:
ansi:
enabled: always
cache:
cache-names: resolveCountry
caffeine:
@ -8,13 +20,27 @@ spring:
kafka:
bootstrap.servers: "localhost:29092"
global.stream.topic: global_topic
concrete.stream.topic: concrete
result.stream.topic: result
topic.template: template
topic.reference: template_reference
backoff.interval: 1000
max:
retry.attempts: 3
poll.records: 100
topic:
global: global_topic
result: result
template: template
reference: template_reference
ssl:
enable: false
keystore-location: src/main/resources/cert/kenny-k.struzhkin.p12
keystore-password: kenny
key-password: kenny
server-password: kenny12
server-keystore-location: src/main/resources/cert/truststore.p12
clickhouse.db.url: "jdbc:clickhouse://localhost:8123/default"
clickhouse.db:
url: "jdbc:clickhouse://localhost:8123/default"
user: "user"
password: "password"
geo.ip.service.url: "localhost:29092/v1/columbus"
wb.list.service.url: "localhost:29092/v1/wb_list"

View File

@ -43,7 +43,7 @@ public class ApiInspectorTest extends KafkaAbstractTest {
@LocalServerPort
int serverPort;
private static String SERVICE_URL = "http://localhost:%s/v1/fraud_inspector";
private static String SERVICE_URL = "http://localhost:%s/fraud_inspector/v1";
@Before
public void init() throws ExecutionException, InterruptedException {

View File

@ -63,10 +63,10 @@ public class EndToEndIntegrationTest extends KafkaAbstractTest {
@LocalServerPort
int serverPort;
@Value("${kafka.global.stream.topic}")
@Value("${kafka.topic.global}")
public String GLOBAL_TOPIC;
private static String SERVICE_URL = "http://localhost:%s/v1/fraud_inspector";
private static String SERVICE_URL = "http://localhost:%s/fraud_inspector/v1";
@ClassRule
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer();
@ -75,8 +75,9 @@ public class EndToEndIntegrationTest extends KafkaAbstractTest {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
log.info("clickhouse.db.url={}", clickHouseContainer.getJdbcUrl());
TestPropertyValues
.of("clickhouse.db.url=" + clickHouseContainer.getJdbcUrl())
TestPropertyValues.of("clickhouse.db.url=" + clickHouseContainer.getJdbcUrl(),
"clickhouse.db.user=" + clickHouseContainer.getUsername(),
"clickhouse.db.password=" + clickHouseContainer.getPassword())
.applyTo(configurableApplicationContext.getEnvironment());
LocationInfo info = new LocationInfo();
info.setCountryGeoId(COUNTRY_GEO_ID);

View File

@ -7,7 +7,7 @@ import com.rbkmoney.fraudbusters.domain.FraudRequest;
import com.rbkmoney.fraudbusters.serde.CommandDeserializer;
import com.rbkmoney.fraudbusters.serde.FraudRequestSerializer;
import com.rbkmoney.fraudbusters.util.KeyGenerator;
import com.rbkmoney.serializer.ThriftSerializer;
import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

View File

@ -0,0 +1,116 @@
package com.rbkmoney.fraudbusters;
import com.rbkmoney.damsel.domain.RiskScore;
import com.rbkmoney.damsel.fraudbusters.Command;
import com.rbkmoney.damsel.fraudbusters.CommandBody;
import com.rbkmoney.damsel.fraudbusters.Template;
import com.rbkmoney.damsel.fraudbusters.TemplateReference;
import com.rbkmoney.damsel.proxy_inspector.Context;
import com.rbkmoney.damsel.proxy_inspector.InspectorProxySrv;
import com.rbkmoney.fraudbusters.constant.TemplateLevel;
import com.rbkmoney.fraudbusters.util.BeanUtil;
import com.rbkmoney.woody.thrift.impl.http.THClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@Slf4j
@ContextConfiguration(initializers = PreLoadTest.Initializer.class)
public class PreLoadTest extends KafkaAbstractTest {
private static final String TEMPLATE = "rule: 12 >= 1\n" +
" -> accept;";
private static final String TEST = "test";
private InspectorProxySrv.Iface client;
@LocalServerPort
int serverPort;
private static String SERVICE_URL = "http://localhost:%s/fraud_inspector/v1";
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
TestPropertyValues
.of("kafka.bootstrap.servers=" + kafka.getBootstrapServers())
.applyTo(configurableApplicationContext.getEnvironment());
try {
createTemplate();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static String createTemplate() throws InterruptedException, ExecutionException {
Producer<String, Command> producer = createProducer();
Command command = new Command();
Template template = new Template();
String id = TEST;
template.setId(id);
template.setTemplate(TEMPLATE.getBytes());
command.setCommandBody(CommandBody.template(template));
command.setCommandType(com.rbkmoney.damsel.fraudbusters.CommandType.CREATE);
ProducerRecord<String, Command> producerRecord = new ProducerRecord<>("template",
id, command);
producer.send(producerRecord).get();
producer.close();
return id;
}
}
@Before
public void init() throws ExecutionException, InterruptedException {
createGlobalReferenceToTemplate(TEST);
}
private void createGlobalReferenceToTemplate(String id) throws InterruptedException, ExecutionException {
Producer<String, Command> producer;
Command command;
ProducerRecord<String, Command> producerRecord;
producer = createProducer();
command = new Command();
TemplateReference value = new TemplateReference();
value.setIsGlobal(true);
value.setTemplateId(id);
command.setCommandBody(CommandBody.reference(value));
command.setCommandType(com.rbkmoney.damsel.fraudbusters.CommandType.CREATE);
producerRecord = new ProducerRecord<>(referenceTopic,
TemplateLevel.GLOBAL.name(), command);
producer.send(producerRecord).get();
producer.close();
}
@Test
public void inspectPaymentTest() throws URISyntaxException, TException, ExecutionException, InterruptedException {
THClientBuilder clientBuilder = new THClientBuilder()
.withAddress(new URI(String.format(SERVICE_URL, serverPort)))
.withNetworkTimeout(300000);
client = clientBuilder.build(InspectorProxySrv.Iface.class);
createGlobalReferenceToTemplate(TEST);
Context context = BeanUtil.createContext();
RiskScore riskScore = client.inspectPayment(context);
Assert.assertEquals(riskScore, RiskScore.low);
}
}

View File

@ -64,7 +64,9 @@ public class EventRepositoryTest {
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
log.info("clickhouse.db.url={}", clickHouseContainer.getJdbcUrl());
TestPropertyValues
.of("clickhouse.db.url=" + clickHouseContainer.getJdbcUrl())
.of("clickhouse.db.url=" + clickHouseContainer.getJdbcUrl(),
"clickhouse.db.user=" + clickHouseContainer.getUsername(),
"clickhouse.db.password=" + clickHouseContainer.getPassword())
.applyTo(configurableApplicationContext.getEnvironment());
}
}