diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..bc13f00
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,166 @@
+
+
+ 4.0.0
+
+
+ com.rbkmoney
+ spring-boot-starter-parent
+ 2.0.1.RELEASE
+
+
+ com.rbkmoney
+ fraudbusters
+ 1.0-SNAPSHOT
+
+
+ Struzhkin Konstantin <k.struzhkin@rbkmoney.com>
+ 0.2.1
+ 8022
+ 22c57470c4fc47161894f036b7cf9d70f42b75f5
+ UTF-8
+ 7.0.0.Final
+ 0.6.7
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.hibernate
+ hibernate-validator
+
+
+
+
+ net.logstash.logback
+ logstash-logback-encoder
+ 4.6
+
+
+ com.rbkmoney.logback
+ nop-rolling
+ 1.0.1
+
+
+ org.projectlombok
+ lombok
+ 1.18.4
+ provided
+
+
+ org.apache.kafka
+ kafka-streams
+ 2.1.0
+
+
+ com.rbkmoney
+ fraudo
+ 1.0-SNAPSHOT
+
+
+ com.rbkmoney.geck
+ common
+ ${geck.version}
+
+
+ com.rbkmoney.geck
+ serializer
+ ${geck.version}
+
+
+ ru.yandex.clickhouse
+ clickhouse-jdbc
+ 0.1.41
+
+
+
+ com.zaxxer
+ HikariCP-java6
+ 2.3.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.testcontainers
+ clickhouse
+ 1.10.2
+ test
+
+
+ org.apache.kafka
+ kafka-streams-test-utils
+ 2.1.0
+ test
+
+
+
+
+
+
+ ${project.build.directory}/maven-shared-archive-resources
+ ${project.build.directory}
+
+ Dockerfile
+
+ true
+
+
+ ${project.build.directory}/maven-shared-archive-resources
+ true
+
+ Dockerfile
+
+
+
+ src/main/resources
+ true
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-remote-resources-plugin
+ 1.5
+
+
+ org.apache.maven.shared
+ maven-filtering
+ 1.3
+
+
+
+
+ com.rbkmoney:shared-resources:${shared.resources.version}
+
+ false
+ false
+
+
+
+
+ process
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/rbkmoney/fraudbusters/FraudBustersApplication.java b/src/main/java/com/rbkmoney/fraudbusters/FraudBustersApplication.java
new file mode 100644
index 0000000..d02e89a
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/FraudBustersApplication.java
@@ -0,0 +1,13 @@
+package com.rbkmoney.fraudbusters;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class FraudBustersApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(FraudBustersApplication.class, args);
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/ListenerStartup.java b/src/main/java/com/rbkmoney/fraudbusters/ListenerStartup.java
new file mode 100644
index 0000000..c2b7269
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/ListenerStartup.java
@@ -0,0 +1,33 @@
+package com.rbkmoney.fraudbusters;
+
+import com.rbkmoney.fraudbusters.template.TemplateBroker;
+import com.rbkmoney.fraudbusters.template.TemplateListener;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Component
+@RequiredArgsConstructor
+public class ListenerStartup implements ApplicationListener {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Value("${kafka.template.topic}")
+ private String templateTopic;
+
+ private final TemplateBroker templateBroker;
+
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void onApplicationEvent(final ApplicationReadyEvent event) {
+ executorService.submit(new TemplateListener(templateTopic, bootstrapServers, templateBroker));
+ return;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/config/FraudoConfig.java b/src/main/java/com/rbkmoney/fraudbusters/config/FraudoConfig.java
new file mode 100644
index 0000000..1a9420a
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/config/FraudoConfig.java
@@ -0,0 +1,57 @@
+package com.rbkmoney.fraudbusters.config;
+
+import com.rbkmoney.fraudbusters.fraud.aggragator.CountAggregatorImpl;
+import com.rbkmoney.fraudbusters.fraud.aggragator.SumAggregatorImpl;
+import com.rbkmoney.fraudbusters.fraud.aggragator.UniqueValueAggregatorImpl;
+import com.rbkmoney.fraudbusters.fraud.finder.BlackListFinder;
+import com.rbkmoney.fraudbusters.fraud.finder.WightListFinder;
+import com.rbkmoney.fraudbusters.fraud.resolver.CountryResolverImpl;
+import com.rbkmoney.fraudo.aggregator.CountAggregator;
+import com.rbkmoney.fraudo.aggregator.SumAggregator;
+import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
+import com.rbkmoney.fraudo.factory.FastFraudVisitorFactory;
+import com.rbkmoney.fraudo.factory.FraudVisitorFactory;
+import com.rbkmoney.fraudo.finder.InListFinder;
+import com.rbkmoney.fraudo.resolver.CountryResolver;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class FraudoConfig {
+
+ @Bean
+ public FraudVisitorFactory fraudVisitorFactory() {
+ return new FastFraudVisitorFactory();
+ }
+
+ @Bean
+ public CountAggregator countAggregator() {
+ return new CountAggregatorImpl();
+ }
+
+ @Bean
+ public SumAggregator sumAggregator() {
+ return new SumAggregatorImpl();
+ }
+
+ @Bean
+ public UniqueValueAggregator uniqueValueAggregator() {
+ return new UniqueValueAggregatorImpl();
+ }
+
+ @Bean
+ public CountryResolver countryResolver() {
+ return new CountryResolverImpl();
+ }
+
+ @Bean
+ public InListFinder blackListFinder() {
+ return new BlackListFinder();
+ }
+
+ @Bean
+ public InListFinder whiteListFinder() {
+ return new WightListFinder();
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/config/KafkaConfig.java b/src/main/java/com/rbkmoney/fraudbusters/config/KafkaConfig.java
new file mode 100644
index 0000000..7a85838
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/config/KafkaConfig.java
@@ -0,0 +1,31 @@
+package com.rbkmoney.fraudbusters.config;
+
+import com.rbkmoney.fraudbusters.serde.FraudoModelSerde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Properties;
+
+@Configuration
+public class KafkaConfig {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public Properties fraudStreamProperties() {
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-busters");
+ streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "fraud-busters-client");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, FraudoModelSerde.class);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ return streamsConfiguration;
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/constant/Level.java b/src/main/java/com/rbkmoney/fraudbusters/constant/Level.java
new file mode 100644
index 0000000..3c380f8
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/constant/Level.java
@@ -0,0 +1,7 @@
+package com.rbkmoney.fraudbusters.constant;
+
+public enum Level {
+
+ GLOBAL, CONCRETE
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/domain/FraudResult.java b/src/main/java/com/rbkmoney/fraudbusters/domain/FraudResult.java
new file mode 100644
index 0000000..b5a118d
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/domain/FraudResult.java
@@ -0,0 +1,17 @@
+package com.rbkmoney.fraudbusters.domain;
+
+import com.rbkmoney.fraudo.constant.ResultStatus;
+import com.rbkmoney.fraudo.model.FraudModel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Data
+@AllArgsConstructor
+public class FraudResult {
+
+ private FraudModel fraudModel;
+ private ResultStatus resultStatus;
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/domain/RuleTemplate.java b/src/main/java/com/rbkmoney/fraudbusters/domain/RuleTemplate.java
new file mode 100644
index 0000000..6b1751a
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/domain/RuleTemplate.java
@@ -0,0 +1,14 @@
+package com.rbkmoney.fraudbusters.domain;
+
+import com.rbkmoney.fraudbusters.constant.Level;
+import lombok.Data;
+
+@Data
+public class RuleTemplate {
+
+ private String globalId;
+ private String localId;
+ private Level lvl;
+ private String template;
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/factory/stream/ConcreteStreamFactory.java b/src/main/java/com/rbkmoney/fraudbusters/factory/stream/ConcreteStreamFactory.java
new file mode 100644
index 0000000..442f221
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/factory/stream/ConcreteStreamFactory.java
@@ -0,0 +1,57 @@
+package com.rbkmoney.fraudbusters.factory.stream;
+
+import com.rbkmoney.fraudbusters.domain.FraudResult;
+import com.rbkmoney.fraudbusters.serde.FraudoModelSerde;
+import com.rbkmoney.fraudo.FraudoParser;
+import com.rbkmoney.fraudo.aggregator.CountAggregator;
+import com.rbkmoney.fraudo.aggregator.SumAggregator;
+import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
+import com.rbkmoney.fraudo.constant.ResultStatus;
+import com.rbkmoney.fraudo.factory.FastFraudVisitorFactory;
+import com.rbkmoney.fraudo.factory.FraudVisitorFactory;
+import com.rbkmoney.fraudo.finder.InListFinder;
+import com.rbkmoney.fraudo.model.FraudModel;
+import com.rbkmoney.fraudo.resolver.CountryResolver;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+
+@Component
+@RequiredArgsConstructor
+public class ConcreteStreamFactory implements TemplateStreamFactory {
+
+ @Value("${kafka.concrete.stream.topic}")
+ private String readTopic;
+ @Value("${kafka.result.stream.topic}")
+ private String resultTopic;
+
+ private final FraudVisitorFactory fraudVisitorFactory = new FastFraudVisitorFactory();
+ private final FraudoModelSerde fraudoModelSerde = new FraudoModelSerde();
+
+ private final CountAggregator countAggregator;
+ private final SumAggregator sumAggregator;
+ private final UniqueValueAggregator uniqueValueAggregator;
+ private final CountryResolver countryResolver;
+ private final InListFinder blackListFinder;
+ private final InListFinder whiteListFinder;
+
+ @Override
+ public KafkaStreams create(Properties streamsConfiguration, FraudoParser.ParseContext parseContext) {
+ StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(readTopic, Consumed.with(Serdes.String(), fraudoModelSerde))
+ .mapValues(fraudModel -> new FraudResult(fraudModel, applyRules(parseContext, fraudModel)))
+ .to(resultTopic);
+ return new KafkaStreams(builder.build(), streamsConfiguration);
+ }
+
+ private ResultStatus applyRules(FraudoParser.ParseContext parseContext, FraudModel fraudModel) {
+ return (ResultStatus) fraudVisitorFactory.createVisitor(fraudModel, countAggregator, sumAggregator,
+ uniqueValueAggregator, countryResolver, blackListFinder, whiteListFinder).visit(parseContext);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/factory/stream/GlobalStreamFactory.java b/src/main/java/com/rbkmoney/fraudbusters/factory/stream/GlobalStreamFactory.java
new file mode 100644
index 0000000..13b8a76
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/factory/stream/GlobalStreamFactory.java
@@ -0,0 +1,69 @@
+package com.rbkmoney.fraudbusters.factory.stream;
+
+import com.rbkmoney.fraudbusters.domain.FraudResult;
+import com.rbkmoney.fraudbusters.serde.FraudoModelSerde;
+import com.rbkmoney.fraudo.FraudoParser;
+import com.rbkmoney.fraudo.aggregator.CountAggregator;
+import com.rbkmoney.fraudo.aggregator.SumAggregator;
+import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
+import com.rbkmoney.fraudo.constant.ResultStatus;
+import com.rbkmoney.fraudo.factory.FastFraudVisitorFactory;
+import com.rbkmoney.fraudo.factory.FraudVisitorFactory;
+import com.rbkmoney.fraudo.finder.InListFinder;
+import com.rbkmoney.fraudo.model.FraudModel;
+import com.rbkmoney.fraudo.resolver.CountryResolver;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+
+@Component
+@RequiredArgsConstructor
+public class GlobalStreamFactory implements TemplateStreamFactory {
+
+ @Value("${kafka.global.stream.topic}")
+ private String readTopic;
+ @Value("${kafka.result.stream.topic}")
+ private String resultTopic;
+ @Value("${kafka.concrete.stream.topic}")
+ private String concreteTopic;
+
+ private final FraudVisitorFactory fraudVisitorFactory = new FastFraudVisitorFactory();
+ private final FraudoModelSerde fraudoModelSerde = new FraudoModelSerde();
+
+ private final CountAggregator countAggregator;
+ private final SumAggregator sumAggregator;
+ private final UniqueValueAggregator uniqueValueAggregator;
+ private final CountryResolver countryResolver;
+ private final InListFinder blackListFinder;
+ private final InListFinder whiteListFinder;
+
+ @Override
+ public KafkaStreams create(Properties streamsConfiguration, FraudoParser.ParseContext parseContext) {
+ StreamsBuilder builder = new StreamsBuilder();
+ KStream[] branch = builder
+ .stream(readTopic, Consumed.with(Serdes.String(), fraudoModelSerde))
+ .mapValues(fraudModel -> new FraudResult(fraudModel, applyRules(parseContext, fraudModel)))
+ .selectKey((k, v) -> v.getResultStatus())
+ .branch((k, v) -> ResultStatus.ACCEPT.equals(v.getResultStatus()),
+ (k, v) -> !ResultStatus.ACCEPT.equals(v.getResultStatus()));
+ branch[0].selectKey((resultStatus, fraudResult) -> "1")
+ .mapValues(FraudResult::getFraudModel)
+ .to(resultTopic);
+ branch[1].selectKey((resultStatus, fraudResult) -> "2")
+ .mapValues(FraudResult::getFraudModel)
+ .to(concreteTopic);
+ return new KafkaStreams(builder.build(), streamsConfiguration);
+ }
+
+ private ResultStatus applyRules(FraudoParser.ParseContext parseContext, FraudModel fraudModel) {
+ return (ResultStatus) fraudVisitorFactory.createVisitor(fraudModel, countAggregator, sumAggregator,
+ uniqueValueAggregator, countryResolver, blackListFinder, whiteListFinder).visit(parseContext);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/factory/stream/TemplateStreamFactory.java b/src/main/java/com/rbkmoney/fraudbusters/factory/stream/TemplateStreamFactory.java
new file mode 100644
index 0000000..675181d
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/factory/stream/TemplateStreamFactory.java
@@ -0,0 +1,12 @@
+package com.rbkmoney.fraudbusters.factory.stream;
+
+import com.rbkmoney.fraudo.FraudoParser;
+import org.apache.kafka.streams.KafkaStreams;
+
+import java.util.Properties;
+
+public interface TemplateStreamFactory {
+
+ KafkaStreams create(final Properties streamsConfiguration, FraudoParser.ParseContext parseContext);
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/CountAggregatorImpl.java b/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/CountAggregatorImpl.java
new file mode 100644
index 0000000..64ee138
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/CountAggregatorImpl.java
@@ -0,0 +1,21 @@
+package com.rbkmoney.fraudbusters.fraud.aggragator;
+
+import com.rbkmoney.fraudo.aggregator.CountAggregator;
+import com.rbkmoney.fraudo.constant.CheckedField;
+
+public class CountAggregatorImpl implements CountAggregator {
+ @Override
+ public Integer count(CheckedField checkedField, String s, Long aLong) {
+ return null;
+ }
+
+ @Override
+ public Integer countSuccess(CheckedField checkedField, String s, Long aLong) {
+ return null;
+ }
+
+ @Override
+ public Integer countError(CheckedField checkedField, String s, Long aLong, String s1) {
+ return null;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/SumAggregatorImpl.java b/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/SumAggregatorImpl.java
new file mode 100644
index 0000000..66cee04
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/SumAggregatorImpl.java
@@ -0,0 +1,21 @@
+package com.rbkmoney.fraudbusters.fraud.aggragator;
+
+import com.rbkmoney.fraudo.aggregator.SumAggregator;
+import com.rbkmoney.fraudo.constant.CheckedField;
+
+public class SumAggregatorImpl implements SumAggregator {
+ @Override
+ public Double sum(CheckedField checkedField, String s, Long aLong) {
+ return null;
+ }
+
+ @Override
+ public Double sumSuccess(CheckedField checkedField, String s, Long aLong) {
+ return null;
+ }
+
+ @Override
+ public Double sumError(CheckedField checkedField, String s, Long aLong, String s1) {
+ return null;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/UniqueValueAggregatorImpl.java b/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/UniqueValueAggregatorImpl.java
new file mode 100644
index 0000000..3621234
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/fraud/aggragator/UniqueValueAggregatorImpl.java
@@ -0,0 +1,11 @@
+package com.rbkmoney.fraudbusters.fraud.aggragator;
+
+import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
+import com.rbkmoney.fraudo.constant.CheckedField;
+
+public class UniqueValueAggregatorImpl implements UniqueValueAggregator {
+ @Override
+ public Integer countUniqueValue(CheckedField checkedField, CheckedField checkedField1) {
+ return null;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/fraud/finder/BlackListFinder.java b/src/main/java/com/rbkmoney/fraudbusters/fraud/finder/BlackListFinder.java
new file mode 100644
index 0000000..32a7707
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/fraud/finder/BlackListFinder.java
@@ -0,0 +1,11 @@
+package com.rbkmoney.fraudbusters.fraud.finder;
+
+import com.rbkmoney.fraudo.constant.CheckedField;
+import com.rbkmoney.fraudo.finder.InListFinder;
+
+public class BlackListFinder implements InListFinder {
+ @Override
+ public Boolean findInList(CheckedField checkedField, String s) {
+ return null;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/fraud/finder/WightListFinder.java b/src/main/java/com/rbkmoney/fraudbusters/fraud/finder/WightListFinder.java
new file mode 100644
index 0000000..577bd6f
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/fraud/finder/WightListFinder.java
@@ -0,0 +1,11 @@
+package com.rbkmoney.fraudbusters.fraud.finder;
+
+import com.rbkmoney.fraudo.constant.CheckedField;
+import com.rbkmoney.fraudo.finder.InListFinder;
+
+public class WightListFinder implements InListFinder {
+ @Override
+ public Boolean findInList(CheckedField checkedField, String s) {
+ return null;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/fraud/resolver/CountryResolverImpl.java b/src/main/java/com/rbkmoney/fraudbusters/fraud/resolver/CountryResolverImpl.java
new file mode 100644
index 0000000..4247b7e
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/fraud/resolver/CountryResolverImpl.java
@@ -0,0 +1,15 @@
+package com.rbkmoney.fraudbusters.fraud.resolver;
+
+import com.rbkmoney.fraudo.resolver.CountryResolver;
+
+public class CountryResolverImpl implements CountryResolver {
+ @Override
+ public String resolveCountryBank(String s) {
+ return null;
+ }
+
+ @Override
+ public String resolveCountryIp(String s) {
+ return null;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/resource/Count.java b/src/main/java/com/rbkmoney/fraudbusters/resource/Count.java
new file mode 100644
index 0000000..ce385a9
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/resource/Count.java
@@ -0,0 +1,9 @@
+package com.rbkmoney.fraudbusters.resource;
+
+import lombok.Data;
+
+@Data
+public class Count {
+ private String id;
+ private Long count;
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/resource/CountController.java b/src/main/java/com/rbkmoney/fraudbusters/resource/CountController.java
new file mode 100644
index 0000000..a758292
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/resource/CountController.java
@@ -0,0 +1,29 @@
+package com.rbkmoney.fraudbusters.resource;
+
+import com.rbkmoney.fraudbusters.template.TemplateBroker;
+import com.rbkmoney.fraudbusters.constant.Level;
+import com.rbkmoney.fraudbusters.domain.RuleTemplate;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@Slf4j
+@RestController
+public class CountController {
+
+ @Autowired
+ private TemplateBroker templateBroker;
+
+ @GetMapping("/get")
+ public Response get() {
+ RuleTemplate ruleTemplate = new RuleTemplate();
+ ruleTemplate.setLvl(Level.GLOBAL);
+ ruleTemplate.setGlobalId("test");
+ ruleTemplate.setTemplate("rule: 3 > 2 AND 1 = 1\n" +
+ "-> accept;");
+// templateBroker.doDispatch(ruleTemplate);
+ return null;
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/resource/Response.java b/src/main/java/com/rbkmoney/fraudbusters/resource/Response.java
new file mode 100644
index 0000000..c863794
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/resource/Response.java
@@ -0,0 +1,10 @@
+package com.rbkmoney.fraudbusters.resource;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class Response {
+ List list;
+}
\ No newline at end of file
diff --git a/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelDeserializer.java b/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelDeserializer.java
new file mode 100644
index 0000000..0a0dd66
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelDeserializer.java
@@ -0,0 +1,37 @@
+package com.rbkmoney.fraudbusters.serde;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rbkmoney.fraudo.model.FraudModel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+@Slf4j
+public class FraudoModelDeserializer implements Deserializer {
+
+ private final ObjectMapper om = new ObjectMapper();
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+
+ }
+
+ @Override
+ public FraudModel deserialize(String topic, byte[] data) {
+ FraudModel user = null;
+ try {
+ user = om.readValue(data, FraudModel.class);
+ } catch (Exception e) {
+ log.error("Error when deserialize fraudModel data: {} e: ", data, e);
+ }
+ return user;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelSerde.java b/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelSerde.java
new file mode 100644
index 0000000..ea83040
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelSerde.java
@@ -0,0 +1,34 @@
+package com.rbkmoney.fraudbusters.serde;
+
+import com.rbkmoney.fraudo.model.FraudModel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+@Slf4j
+public class FraudoModelSerde implements Serde {
+
+
+ @Override
+ public void configure(Map map, boolean b) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Serializer serializer() {
+ return new FraudoModelSerializer();
+ }
+
+ @Override
+ public Deserializer deserializer() {
+ return new FraudoModelDeserializer();
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelSerializer.java b/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelSerializer.java
new file mode 100644
index 0000000..bd27da9
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/serde/FraudoModelSerializer.java
@@ -0,0 +1,36 @@
+package com.rbkmoney.fraudbusters.serde;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rbkmoney.fraudo.model.FraudModel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+@Slf4j
+public class FraudoModelSerializer implements Serializer {
+
+ private final ObjectMapper om = new ObjectMapper();
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+
+ }
+
+ @Override
+ public byte[] serialize(String topic, FraudModel data) {
+ byte[] retVal = null;
+ try {
+ retVal = om.writeValueAsString(data).getBytes();
+ } catch (Exception e) {
+ log.error("Error when serialize fraudModel data: {} e: ", data, e);
+ }
+ return retVal;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/serde/RuleTemplateDeserializer.java b/src/main/java/com/rbkmoney/fraudbusters/serde/RuleTemplateDeserializer.java
new file mode 100644
index 0000000..a0f6376
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/serde/RuleTemplateDeserializer.java
@@ -0,0 +1,36 @@
+package com.rbkmoney.fraudbusters.serde;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rbkmoney.fraudbusters.domain.RuleTemplate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+
+@Slf4j
+public class RuleTemplateDeserializer implements Deserializer {
+
+ private final ObjectMapper om = new ObjectMapper();
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+
+ }
+
+ @Override
+ public RuleTemplate deserialize(String topic, byte[] data) {
+ RuleTemplate ruleTemplate = null;
+ try {
+ ruleTemplate = om.readValue(data, RuleTemplate.class);
+ } catch (Exception e) {
+ log.error("Error when deserialize ruleTemplate data: {} e: ", data, e);
+ }
+ return ruleTemplate;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/template/TemplateBroker.java b/src/main/java/com/rbkmoney/fraudbusters/template/TemplateBroker.java
new file mode 100644
index 0000000..c32bcb6
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/template/TemplateBroker.java
@@ -0,0 +1,73 @@
+package com.rbkmoney.fraudbusters.template;
+
+import com.rbkmoney.fraudbusters.domain.RuleTemplate;
+import com.rbkmoney.fraudbusters.factory.stream.ConcreteStreamFactory;
+import com.rbkmoney.fraudbusters.factory.stream.GlobalStreamFactory;
+import com.rbkmoney.fraudo.FraudoLexer;
+import com.rbkmoney.fraudo.FraudoParser;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.apache.kafka.streams.KafkaStreams;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class TemplateBroker {
+
+ private final Properties fraudStreamProperties;
+ private final GlobalStreamFactory globalStreamFactory;
+ private final ConcreteStreamFactory concreteStreamFactory;
+ private KafkaStreams globalStreams;
+
+ private Map concreteStreams = new HashMap<>();
+
+ public void doDispatch(RuleTemplate ruleTemplate) {
+ switch (ruleTemplate.getLvl()) {
+ case GLOBAL: {
+ FraudoParser.ParseContext parseContext = getParseContext(ruleTemplate.getTemplate());
+ KafkaStreams newStream = globalStreamFactory.create(fraudStreamProperties, parseContext);
+ restartGlobalStream(globalStreams, newStream);
+ globalStreams = newStream;
+ return;
+ }
+ case CONCRETE: {
+ String localId = ruleTemplate.getLocalId();
+ KafkaStreams kafkaStreams = concreteStreams.get(localId);
+ FraudoParser.ParseContext parseContext = getParseContext(ruleTemplate.getTemplate());
+ KafkaStreams streams = concreteStreamFactory.create(fraudStreamProperties, parseContext);
+ restartGlobalStream(kafkaStreams, streams);
+ concreteStreams.put(localId, streams);
+ return;
+ }
+ default: {
+ log.warn("This template lvl={} is not supported!", ruleTemplate.getLvl());
+ }
+ }
+ }
+
+ private void restartGlobalStream(KafkaStreams kafkaStreamsOld, KafkaStreams newStream) {
+ if (kafkaStreamsOld != null && kafkaStreamsOld.state().isRunning()) {
+ kafkaStreamsOld.close();
+ }
+ while (true) {
+ if (kafkaStreamsOld == null || !kafkaStreamsOld.state().isRunning()) {
+ newStream.start();
+ return;
+ }
+ }
+ }
+
+ private FraudoParser.ParseContext getParseContext(String template) {
+ FraudoLexer lexer = new FraudoLexer(CharStreams.fromString(template));
+ FraudoParser parser = new FraudoParser(new CommonTokenStream(lexer));
+ return parser.parse();
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/template/TemplateListener.java b/src/main/java/com/rbkmoney/fraudbusters/template/TemplateListener.java
new file mode 100644
index 0000000..63aed8d
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/template/TemplateListener.java
@@ -0,0 +1,60 @@
+package com.rbkmoney.fraudbusters.template;
+
+import com.rbkmoney.fraudbusters.domain.RuleTemplate;
+import com.rbkmoney.fraudbusters.serde.RuleTemplateDeserializer;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+@RequiredArgsConstructor
+public class TemplateListener implements Runnable {
+
+ private static final String GROUP_ID = "TemplateListener-";
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private final String listenTopic;
+ private final String bootstrapServers;
+ private final TemplateBroker templateBroker;
+
+ @Override
+ public void run() {
+ final Consumer consumer = create(listenTopic);
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
+ consumerRecords.forEach(record -> {
+ templateBroker.doDispatch(record.value());
+ log.info("apply template: {}", record);
+ });
+ }
+ consumer.close();
+ }
+
+
+ public void stop() {
+ stopped.set(true);
+ }
+
+ private Consumer create(String topic) {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + UUID.randomUUID().toString());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, RuleTemplateDeserializer.class.getName());
+ final Consumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(topic));
+ return consumer;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/fraudbusters/visitor/StreamAggregatorVisitor.java b/src/main/java/com/rbkmoney/fraudbusters/visitor/StreamAggregatorVisitor.java
new file mode 100644
index 0000000..8aa7ac1
--- /dev/null
+++ b/src/main/java/com/rbkmoney/fraudbusters/visitor/StreamAggregatorVisitor.java
@@ -0,0 +1,76 @@
+package com.rbkmoney.fraudbusters.visitor;
+
+import com.rbkmoney.fraudo.FraudoBaseVisitor;
+import com.rbkmoney.fraudo.FraudoParser;
+import com.rbkmoney.fraudo.constant.CheckedField;
+import com.rbkmoney.fraudo.model.FraudModel;
+import com.rbkmoney.fraudo.utils.TextUtil;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.*;
+
+import java.util.concurrent.TimeUnit;
+
+@RequiredArgsConstructor
+public class StreamAggregatorVisitor extends FraudoBaseVisitor {
+
+ private KafkaStreams streams = null;
+ private final StreamsBuilder builder = new StreamsBuilder();
+ private final String partyId;
+
+ @Override
+ public StreamsBuilder visitCount(FraudoParser.CountContext ctx) {
+ String countTarget = TextUtil.safeGetText(ctx.STRING());
+ String time = TextUtil.safeGetText(ctx.DECIMAL());
+
+ final KStream views = builder.stream("test");
+
+ final KTable, Long> anomalousUsers = views
+ .filter((ignoredKey, model) -> partyId.equals(model.getPartyId()))
+ .map((ignoredKey, username) -> new KeyValue<>(username.getIp(), username.getIp()))
+ .groupByKey()
+ .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(Long.getLong(time))))
+ .count();
+
+ final KStream anomalousUsersForConsole = anomalousUsers
+ .toStream()
+ .filter((windowedUserId, count) -> count != null)
+ .map((windowedUserId, count) -> new KeyValue<>(windowedUserId.toString(), count));
+
+ final Serde stringSerde = Serdes.String();
+ final Serde longSerde = Serdes.Long();
+
+ anomalousUsersForConsole.to("AnomalousUsers", Produced.with(stringSerde, longSerde));
+
+ return builder;
+ }
+
+ @Override
+ public StreamsBuilder visitCount_success(FraudoParser.Count_successContext ctx) {
+ return super.visitCount_success(ctx);
+ }
+
+ @Override
+ public StreamsBuilder visitCount_error(FraudoParser.Count_errorContext ctx) {
+ return super.visitCount_error(ctx);
+ }
+
+ @Override
+ public StreamsBuilder visitSum(FraudoParser.SumContext ctx) {
+ return super.visitSum(ctx);
+ }
+
+ @Override
+ public StreamsBuilder visitSum_success(FraudoParser.Sum_successContext ctx) {
+ return super.visitSum_success(ctx);
+ }
+
+ @Override
+ public StreamsBuilder visitSum_error(FraudoParser.Sum_errorContext ctx) {
+ return super.visitSum_error(ctx);
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..2249135
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,8 @@
+server.port: @server.port@
+
+kafka:
+ bootstrap.servers: "localhost:29092"
+ global.stream.topic: global_topic
+ concrete.stream.topic: concrete
+ result.stream.topic: result
+ template.topic: template_new_3
diff --git a/src/test/java/com/rbkmoney/fraudbusters/FraudBustersApplicationTest.java b/src/test/java/com/rbkmoney/fraudbusters/FraudBustersApplicationTest.java
new file mode 100644
index 0000000..adb70f5
--- /dev/null
+++ b/src/test/java/com/rbkmoney/fraudbusters/FraudBustersApplicationTest.java
@@ -0,0 +1,181 @@
+package com.rbkmoney.fraudbusters;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.junit.*;
+import org.testcontainers.containers.ClickHouseContainer;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Date;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+@Slf4j
+public class FraudBustersApplicationTest {
+
+ public static final String DEFAULT = "default";
+ private ClickHouseDataSource dataSource;
+ private ClickHouseConnection connection;
+
+ @Rule
+ public ClickHouseContainer clickhouse = new ClickHouseContainer();
+
+ @Test
+ public void testUpdateCountForSelect() throws Exception {
+ HikariConfig hikariConfig = new HikariConfig();
+ hikariConfig.setJdbcUrl(clickhouse.getJdbcUrl());
+ hikariConfig.setUsername(clickhouse.getUsername());
+ hikariConfig.setPassword(clickhouse.getPassword());
+ HikariDataSource ds = new HikariDataSource(hikariConfig);
+ Statement statement = ds.getConnection().createStatement();
+ statement.executeUpdate("CREATE TABLE `ontime` (\n" +
+ " `Year` UInt16,\n" +
+ " `Quarter` UInt8,\n" +
+ " `Month` UInt8,\n" +
+ " `DayofMonth` UInt8,\n" +
+ " `DayOfWeek` UInt8,\n" +
+ " `FlightDate` Date,\n" +
+ " `UniqueCarrier` FixedString(7),\n" +
+ " `AirlineID` Int32,\n" +
+ " `Carrier` FixedString(2),\n" +
+ " `TailNum` String,\n" +
+ " `FlightNum` String,\n" +
+ " `OriginAirportID` Int32,\n" +
+ " `OriginAirportSeqID` Int32,\n" +
+ " `OriginCityMarketID` Int32,\n" +
+ " `Origin` FixedString(5),\n" +
+ " `OriginCityName` String,\n" +
+ " `OriginState` FixedString(2),\n" +
+ " `OriginStateFips` String,\n" +
+ " `OriginStateName` String,\n" +
+ " `OriginWac` Int32,\n" +
+ " `DestAirportID` Int32,\n" +
+ " `DestAirportSeqID` Int32,\n" +
+ " `DestCityMarketID` Int32,\n" +
+ " `Dest` FixedString(5),\n" +
+ " `DestCityName` String,\n" +
+ " `DestState` FixedString(2),\n" +
+ " `DestStateFips` String,\n" +
+ " `DestStateName` String,\n" +
+ " `DestWac` Int32,\n" +
+ " `CRSDepTime` Int32,\n" +
+ " `DepTime` Int32,\n" +
+ " `DepDelay` Int32,\n" +
+ " `DepDelayMinutes` Int32,\n" +
+ " `DepDel15` Int32,\n" +
+ " `DepartureDelayGroups` String,\n" +
+ " `DepTimeBlk` String,\n" +
+ " `TaxiOut` Int32,\n" +
+ " `WheelsOff` Int32,\n" +
+ " `WheelsOn` Int32,\n" +
+ " `TaxiIn` Int32,\n" +
+ " `CRSArrTime` Int32,\n" +
+ " `ArrTime` Int32,\n" +
+ " `ArrDelay` Int32,\n" +
+ " `ArrDelayMinutes` Int32,\n" +
+ " `ArrDel15` Int32,\n" +
+ " `ArrivalDelayGroups` Int32,\n" +
+ " `ArrTimeBlk` String,\n" +
+ " `Cancelled` UInt8,\n" +
+ " `CancellationCode` FixedString(1),\n" +
+ " `Diverted` UInt8,\n" +
+ " `CRSElapsedTime` Int32,\n" +
+ " `ActualElapsedTime` Int32,\n" +
+ " `AirTime` Int32,\n" +
+ " `Flights` Int32,\n" +
+ " `Distance` Int32,\n" +
+ " `DistanceGroup` UInt8,\n" +
+ " `CarrierDelay` Int32,\n" +
+ " `WeatherDelay` Int32,\n" +
+ " `NASDelay` Int32,\n" +
+ " `SecurityDelay` Int32,\n" +
+ " `LateAircraftDelay` Int32,\n" +
+ " `FirstDepTime` String,\n" +
+ " `TotalAddGTime` String,\n" +
+ " `LongestAddGTime` String,\n" +
+ " `DivAirportLandings` String,\n" +
+ " `DivReachedDest` String,\n" +
+ " `DivActualElapsedTime` String,\n" +
+ " `DivArrDelay` String,\n" +
+ " `DivDistance` String,\n" +
+ " `Div1Airport` String,\n" +
+ " `Div1AirportID` Int32,\n" +
+ " `Div1AirportSeqID` Int32,\n" +
+ " `Div1WheelsOn` String,\n" +
+ " `Div1TotalGTime` String,\n" +
+ " `Div1LongestGTime` String,\n" +
+ " `Div1WheelsOff` String,\n" +
+ " `Div1TailNum` String,\n" +
+ " `Div2Airport` String,\n" +
+ " `Div2AirportID` Int32,\n" +
+ " `Div2AirportSeqID` Int32,\n" +
+ " `Div2WheelsOn` String,\n" +
+ " `Div2TotalGTime` String,\n" +
+ " `Div2LongestGTime` String,\n" +
+ " `Div2WheelsOff` String,\n" +
+ " `Div2TailNum` String,\n" +
+ " `Div3Airport` String,\n" +
+ " `Div3AirportID` Int32,\n" +
+ " `Div3AirportSeqID` Int32,\n" +
+ " `Div3WheelsOn` String,\n" +
+ " `Div3TotalGTime` String,\n" +
+ " `Div3LongestGTime` String,\n" +
+ " `Div3WheelsOff` String,\n" +
+ " `Div3TailNum` String,\n" +
+ " `Div4Airport` String,\n" +
+ " `Div4AirportID` Int32,\n" +
+ " `Div4AirportSeqID` Int32,\n" +
+ " `Div4WheelsOn` String,\n" +
+ " `Div4TotalGTime` String,\n" +
+ " `Div4LongestGTime` String,\n" +
+ " `Div4WheelsOff` String,\n" +
+ " `Div4TailNum` String,\n" +
+ " `Div5Airport` String,\n" +
+ " `Div5AirportID` Int32,\n" +
+ " `Div5AirportSeqID` Int32,\n" +
+ " `Div5WheelsOn` String,\n" +
+ " `Div5TotalGTime` String,\n" +
+ " `Div5LongestGTime` String,\n" +
+ " `Div5WheelsOff` String,\n" +
+ " `Div5TailNum` String\n" +
+ ") ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192)");
+
+ PreparedStatement prepareStatement = ds.getConnection().prepareStatement("INSERT INTO ontime [(Year, Month , FlightDate)] VALUES (?, ?, ?);");
+ prepareStatement.setInt(1, 1);
+ prepareStatement.setInt(2, 1);
+ prepareStatement.setDate(3, new java.sql.Date(1));
+ boolean execute = prepareStatement.execute();
+
+ statement = ds.getConnection().createStatement();
+ statement.execute("select avg(c1) from (select Year, Month, count(*) as c1 from ontime group by Year, Month);");
+ ResultSet resultSet = statement.getResultSet();
+ resultSet.next();
+ Date resultSetInt = resultSet.getDate(1);
+ assertEquals("A basic SELECT query succeeds", 1, resultSetInt);
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/rbkmoney/fraudbusters/KafkaTest.java b/src/test/java/com/rbkmoney/fraudbusters/KafkaTest.java
new file mode 100644
index 0000000..aaaf785
--- /dev/null
+++ b/src/test/java/com/rbkmoney/fraudbusters/KafkaTest.java
@@ -0,0 +1,34 @@
+package com.rbkmoney.fraudbusters;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@Slf4j
+@SpringBootTest
+@RunWith(SpringRunner.class)
+public class KafkaTest {
+
+
+ @Test
+ public void testUpdateCountForSelect() throws Exception {
+
+
+
+
+// ReadOnlyWindowStore windowStore =
+// streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
+//
+// WindowStoreIterator iterator = windowStore.fetch("world", Instant.ofEpochMilli(0), Instant.now());
+// while (iterator.hasNext()) {
+// KeyValue next = iterator.next();
+// long windowTimestamp = next.key;
+// System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
+// }
+// iterator.close();
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
new file mode 100755
index 0000000..046227e
--- /dev/null
+++ b/src/test/resources/logback-test.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file