Initial commit

This commit is contained in:
k.struzhkin 2018-12-14 23:10:02 +03:00
parent 20211f4f1d
commit d4859bddb7
31 changed files with 1199 additions and 0 deletions

166
pom.xml Normal file
View File

@ -0,0 +1,166 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<groupId>com.rbkmoney</groupId>
<artifactId>fraudbusters</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.maintainer>Struzhkin Konstantin &lt;k.struzhkin@rbkmoney.com&gt;</project.maintainer>
<shared.resources.version>0.2.1</shared.resources.version>
<server.port>8022</server.port>
<dockerfile.base.service.tag>22c57470c4fc47161894f036b7cf9d70f42b75f5</dockerfile.base.service.tag>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<drools-version>7.0.0.Final</drools-version>
<geck.version>0.6.7</geck.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.6</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.logback</groupId>
<artifactId>nop-rolling</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>fraudo</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.geck</groupId>
<artifactId>common</artifactId>
<version>${geck.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.geck</groupId>
<artifactId>serializer</artifactId>
<version>${geck.version}</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.41</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java6</artifactId>
<version>2.3.8</version>
</dependency>
<!-- Test libs -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<version>1.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.build.directory}/maven-shared-archive-resources</directory>
<targetPath>${project.build.directory}</targetPath>
<includes>
<include>Dockerfile</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>${project.build.directory}/maven-shared-archive-resources</directory>
<filtering>true</filtering>
<excludes>
<exclude>Dockerfile</exclude>
</excludes>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-remote-resources-plugin</artifactId>
<version>1.5</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.shared</groupId>
<artifactId>maven-filtering</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
<configuration>
<resourceBundles>
<resourceBundle>com.rbkmoney:shared-resources:${shared.resources.version}</resourceBundle>
</resourceBundles>
<attachToMain>false</attachToMain>
<attachToTest>false</attachToTest>
</configuration>
<executions>
<execution>
<goals>
<goal>process</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,13 @@
package com.rbkmoney.fraudbusters;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FraudBustersApplication {
public static void main(String[] args) {
SpringApplication.run(FraudBustersApplication.class, args);
}
}

View File

@ -0,0 +1,33 @@
package com.rbkmoney.fraudbusters;
import com.rbkmoney.fraudbusters.template.TemplateBroker;
import com.rbkmoney.fraudbusters.template.TemplateListener;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
@RequiredArgsConstructor
public class ListenerStartup implements ApplicationListener<ApplicationReadyEvent> {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.template.topic}")
private String templateTopic;
private final TemplateBroker templateBroker;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
@Override
public void onApplicationEvent(final ApplicationReadyEvent event) {
executorService.submit(new TemplateListener(templateTopic, bootstrapServers, templateBroker));
return;
}
}

View File

@ -0,0 +1,57 @@
package com.rbkmoney.fraudbusters.config;
import com.rbkmoney.fraudbusters.fraud.aggragator.CountAggregatorImpl;
import com.rbkmoney.fraudbusters.fraud.aggragator.SumAggregatorImpl;
import com.rbkmoney.fraudbusters.fraud.aggragator.UniqueValueAggregatorImpl;
import com.rbkmoney.fraudbusters.fraud.finder.BlackListFinder;
import com.rbkmoney.fraudbusters.fraud.finder.WightListFinder;
import com.rbkmoney.fraudbusters.fraud.resolver.CountryResolverImpl;
import com.rbkmoney.fraudo.aggregator.CountAggregator;
import com.rbkmoney.fraudo.aggregator.SumAggregator;
import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
import com.rbkmoney.fraudo.factory.FastFraudVisitorFactory;
import com.rbkmoney.fraudo.factory.FraudVisitorFactory;
import com.rbkmoney.fraudo.finder.InListFinder;
import com.rbkmoney.fraudo.resolver.CountryResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FraudoConfig {
@Bean
public FraudVisitorFactory fraudVisitorFactory() {
return new FastFraudVisitorFactory();
}
@Bean
public CountAggregator countAggregator() {
return new CountAggregatorImpl();
}
@Bean
public SumAggregator sumAggregator() {
return new SumAggregatorImpl();
}
@Bean
public UniqueValueAggregator uniqueValueAggregator() {
return new UniqueValueAggregatorImpl();
}
@Bean
public CountryResolver countryResolver() {
return new CountryResolverImpl();
}
@Bean
public InListFinder blackListFinder() {
return new BlackListFinder();
}
@Bean
public InListFinder whiteListFinder() {
return new WightListFinder();
}
}

View File

@ -0,0 +1,31 @@
package com.rbkmoney.fraudbusters.config;
import com.rbkmoney.fraudbusters.serde.FraudoModelSerde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
public class KafkaConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Properties fraudStreamProperties() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-busters");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "fraud-busters-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, FraudoModelSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return streamsConfiguration;
}
}

View File

@ -0,0 +1,7 @@
package com.rbkmoney.fraudbusters.constant;
public enum Level {
GLOBAL, CONCRETE
}

View File

@ -0,0 +1,17 @@
package com.rbkmoney.fraudbusters.domain;
import com.rbkmoney.fraudo.constant.ResultStatus;
import com.rbkmoney.fraudo.model.FraudModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@AllArgsConstructor
public class FraudResult {
private FraudModel fraudModel;
private ResultStatus resultStatus;
}

View File

@ -0,0 +1,14 @@
package com.rbkmoney.fraudbusters.domain;
import com.rbkmoney.fraudbusters.constant.Level;
import lombok.Data;
@Data
public class RuleTemplate {
private String globalId;
private String localId;
private Level lvl;
private String template;
}

View File

@ -0,0 +1,57 @@
package com.rbkmoney.fraudbusters.factory.stream;
import com.rbkmoney.fraudbusters.domain.FraudResult;
import com.rbkmoney.fraudbusters.serde.FraudoModelSerde;
import com.rbkmoney.fraudo.FraudoParser;
import com.rbkmoney.fraudo.aggregator.CountAggregator;
import com.rbkmoney.fraudo.aggregator.SumAggregator;
import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
import com.rbkmoney.fraudo.constant.ResultStatus;
import com.rbkmoney.fraudo.factory.FastFraudVisitorFactory;
import com.rbkmoney.fraudo.factory.FraudVisitorFactory;
import com.rbkmoney.fraudo.finder.InListFinder;
import com.rbkmoney.fraudo.model.FraudModel;
import com.rbkmoney.fraudo.resolver.CountryResolver;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
@RequiredArgsConstructor
public class ConcreteStreamFactory implements TemplateStreamFactory {
@Value("${kafka.concrete.stream.topic}")
private String readTopic;
@Value("${kafka.result.stream.topic}")
private String resultTopic;
private final FraudVisitorFactory fraudVisitorFactory = new FastFraudVisitorFactory();
private final FraudoModelSerde fraudoModelSerde = new FraudoModelSerde();
private final CountAggregator countAggregator;
private final SumAggregator sumAggregator;
private final UniqueValueAggregator uniqueValueAggregator;
private final CountryResolver countryResolver;
private final InListFinder blackListFinder;
private final InListFinder whiteListFinder;
@Override
public KafkaStreams create(Properties streamsConfiguration, FraudoParser.ParseContext parseContext) {
StreamsBuilder builder = new StreamsBuilder();
builder.stream(readTopic, Consumed.with(Serdes.String(), fraudoModelSerde))
.mapValues(fraudModel -> new FraudResult(fraudModel, applyRules(parseContext, fraudModel)))
.to(resultTopic);
return new KafkaStreams(builder.build(), streamsConfiguration);
}
private ResultStatus applyRules(FraudoParser.ParseContext parseContext, FraudModel fraudModel) {
return (ResultStatus) fraudVisitorFactory.createVisitor(fraudModel, countAggregator, sumAggregator,
uniqueValueAggregator, countryResolver, blackListFinder, whiteListFinder).visit(parseContext);
}
}

View File

@ -0,0 +1,69 @@
package com.rbkmoney.fraudbusters.factory.stream;
import com.rbkmoney.fraudbusters.domain.FraudResult;
import com.rbkmoney.fraudbusters.serde.FraudoModelSerde;
import com.rbkmoney.fraudo.FraudoParser;
import com.rbkmoney.fraudo.aggregator.CountAggregator;
import com.rbkmoney.fraudo.aggregator.SumAggregator;
import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
import com.rbkmoney.fraudo.constant.ResultStatus;
import com.rbkmoney.fraudo.factory.FastFraudVisitorFactory;
import com.rbkmoney.fraudo.factory.FraudVisitorFactory;
import com.rbkmoney.fraudo.finder.InListFinder;
import com.rbkmoney.fraudo.model.FraudModel;
import com.rbkmoney.fraudo.resolver.CountryResolver;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
@RequiredArgsConstructor
public class GlobalStreamFactory implements TemplateStreamFactory {
@Value("${kafka.global.stream.topic}")
private String readTopic;
@Value("${kafka.result.stream.topic}")
private String resultTopic;
@Value("${kafka.concrete.stream.topic}")
private String concreteTopic;
private final FraudVisitorFactory fraudVisitorFactory = new FastFraudVisitorFactory();
private final FraudoModelSerde fraudoModelSerde = new FraudoModelSerde();
private final CountAggregator countAggregator;
private final SumAggregator sumAggregator;
private final UniqueValueAggregator uniqueValueAggregator;
private final CountryResolver countryResolver;
private final InListFinder blackListFinder;
private final InListFinder whiteListFinder;
@Override
public KafkaStreams create(Properties streamsConfiguration, FraudoParser.ParseContext parseContext) {
StreamsBuilder builder = new StreamsBuilder();
KStream<ResultStatus, FraudResult>[] branch = builder
.stream(readTopic, Consumed.with(Serdes.String(), fraudoModelSerde))
.mapValues(fraudModel -> new FraudResult(fraudModel, applyRules(parseContext, fraudModel)))
.selectKey((k, v) -> v.getResultStatus())
.branch((k, v) -> ResultStatus.ACCEPT.equals(v.getResultStatus()),
(k, v) -> !ResultStatus.ACCEPT.equals(v.getResultStatus()));
branch[0].selectKey((resultStatus, fraudResult) -> "1")
.mapValues(FraudResult::getFraudModel)
.to(resultTopic);
branch[1].selectKey((resultStatus, fraudResult) -> "2")
.mapValues(FraudResult::getFraudModel)
.to(concreteTopic);
return new KafkaStreams(builder.build(), streamsConfiguration);
}
private ResultStatus applyRules(FraudoParser.ParseContext parseContext, FraudModel fraudModel) {
return (ResultStatus) fraudVisitorFactory.createVisitor(fraudModel, countAggregator, sumAggregator,
uniqueValueAggregator, countryResolver, blackListFinder, whiteListFinder).visit(parseContext);
}
}

View File

@ -0,0 +1,12 @@
package com.rbkmoney.fraudbusters.factory.stream;
import com.rbkmoney.fraudo.FraudoParser;
import org.apache.kafka.streams.KafkaStreams;
import java.util.Properties;
public interface TemplateStreamFactory {
KafkaStreams create(final Properties streamsConfiguration, FraudoParser.ParseContext parseContext);
}

View File

@ -0,0 +1,21 @@
package com.rbkmoney.fraudbusters.fraud.aggragator;
import com.rbkmoney.fraudo.aggregator.CountAggregator;
import com.rbkmoney.fraudo.constant.CheckedField;
public class CountAggregatorImpl implements CountAggregator {
@Override
public Integer count(CheckedField checkedField, String s, Long aLong) {
return null;
}
@Override
public Integer countSuccess(CheckedField checkedField, String s, Long aLong) {
return null;
}
@Override
public Integer countError(CheckedField checkedField, String s, Long aLong, String s1) {
return null;
}
}

View File

@ -0,0 +1,21 @@
package com.rbkmoney.fraudbusters.fraud.aggragator;
import com.rbkmoney.fraudo.aggregator.SumAggregator;
import com.rbkmoney.fraudo.constant.CheckedField;
public class SumAggregatorImpl implements SumAggregator {
@Override
public Double sum(CheckedField checkedField, String s, Long aLong) {
return null;
}
@Override
public Double sumSuccess(CheckedField checkedField, String s, Long aLong) {
return null;
}
@Override
public Double sumError(CheckedField checkedField, String s, Long aLong, String s1) {
return null;
}
}

View File

@ -0,0 +1,11 @@
package com.rbkmoney.fraudbusters.fraud.aggragator;
import com.rbkmoney.fraudo.aggregator.UniqueValueAggregator;
import com.rbkmoney.fraudo.constant.CheckedField;
public class UniqueValueAggregatorImpl implements UniqueValueAggregator {
@Override
public Integer countUniqueValue(CheckedField checkedField, CheckedField checkedField1) {
return null;
}
}

View File

@ -0,0 +1,11 @@
package com.rbkmoney.fraudbusters.fraud.finder;
import com.rbkmoney.fraudo.constant.CheckedField;
import com.rbkmoney.fraudo.finder.InListFinder;
public class BlackListFinder implements InListFinder {
@Override
public Boolean findInList(CheckedField checkedField, String s) {
return null;
}
}

View File

@ -0,0 +1,11 @@
package com.rbkmoney.fraudbusters.fraud.finder;
import com.rbkmoney.fraudo.constant.CheckedField;
import com.rbkmoney.fraudo.finder.InListFinder;
public class WightListFinder implements InListFinder {
@Override
public Boolean findInList(CheckedField checkedField, String s) {
return null;
}
}

View File

@ -0,0 +1,15 @@
package com.rbkmoney.fraudbusters.fraud.resolver;
import com.rbkmoney.fraudo.resolver.CountryResolver;
public class CountryResolverImpl implements CountryResolver {
@Override
public String resolveCountryBank(String s) {
return null;
}
@Override
public String resolveCountryIp(String s) {
return null;
}
}

View File

@ -0,0 +1,9 @@
package com.rbkmoney.fraudbusters.resource;
import lombok.Data;
@Data
public class Count {
private String id;
private Long count;
}

View File

@ -0,0 +1,29 @@
package com.rbkmoney.fraudbusters.resource;
import com.rbkmoney.fraudbusters.template.TemplateBroker;
import com.rbkmoney.fraudbusters.constant.Level;
import com.rbkmoney.fraudbusters.domain.RuleTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class CountController {
@Autowired
private TemplateBroker templateBroker;
@GetMapping("/get")
public Response get() {
RuleTemplate ruleTemplate = new RuleTemplate();
ruleTemplate.setLvl(Level.GLOBAL);
ruleTemplate.setGlobalId("test");
ruleTemplate.setTemplate("rule: 3 > 2 AND 1 = 1\n" +
"-> accept;");
// templateBroker.doDispatch(ruleTemplate);
return null;
}
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.fraudbusters.resource;
import lombok.Data;
import java.util.List;
@Data
public class Response {
List<Count> list;
}

View File

@ -0,0 +1,37 @@
package com.rbkmoney.fraudbusters.serde;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.fraudo.model.FraudModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
@Slf4j
public class FraudoModelDeserializer implements Deserializer<FraudModel> {
private final ObjectMapper om = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public FraudModel deserialize(String topic, byte[] data) {
FraudModel user = null;
try {
user = om.readValue(data, FraudModel.class);
} catch (Exception e) {
log.error("Error when deserialize fraudModel data: {} e: ", data, e);
}
return user;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.fraudbusters.serde;
import com.rbkmoney.fraudo.model.FraudModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
@Slf4j
public class FraudoModelSerde implements Serde<FraudModel> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public void close() {
}
@Override
public Serializer<FraudModel> serializer() {
return new FraudoModelSerializer();
}
@Override
public Deserializer<FraudModel> deserializer() {
return new FraudoModelDeserializer();
}
}

View File

@ -0,0 +1,36 @@
package com.rbkmoney.fraudbusters.serde;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.fraudo.model.FraudModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
@Slf4j
public class FraudoModelSerializer implements Serializer<FraudModel> {
private final ObjectMapper om = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, FraudModel data) {
byte[] retVal = null;
try {
retVal = om.writeValueAsString(data).getBytes();
} catch (Exception e) {
log.error("Error when serialize fraudModel data: {} e: ", data, e);
}
return retVal;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,36 @@
package com.rbkmoney.fraudbusters.serde;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.fraudbusters.domain.RuleTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
@Slf4j
public class RuleTemplateDeserializer implements Deserializer<RuleTemplate> {
private final ObjectMapper om = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public RuleTemplate deserialize(String topic, byte[] data) {
RuleTemplate ruleTemplate = null;
try {
ruleTemplate = om.readValue(data, RuleTemplate.class);
} catch (Exception e) {
log.error("Error when deserialize ruleTemplate data: {} e: ", data, e);
}
return ruleTemplate;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,73 @@
package com.rbkmoney.fraudbusters.template;
import com.rbkmoney.fraudbusters.domain.RuleTemplate;
import com.rbkmoney.fraudbusters.factory.stream.ConcreteStreamFactory;
import com.rbkmoney.fraudbusters.factory.stream.GlobalStreamFactory;
import com.rbkmoney.fraudo.FraudoLexer;
import com.rbkmoney.fraudo.FraudoParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Slf4j
@Service
@RequiredArgsConstructor
public class TemplateBroker {
private final Properties fraudStreamProperties;
private final GlobalStreamFactory globalStreamFactory;
private final ConcreteStreamFactory concreteStreamFactory;
private KafkaStreams globalStreams;
private Map<String, KafkaStreams> concreteStreams = new HashMap<>();
public void doDispatch(RuleTemplate ruleTemplate) {
switch (ruleTemplate.getLvl()) {
case GLOBAL: {
FraudoParser.ParseContext parseContext = getParseContext(ruleTemplate.getTemplate());
KafkaStreams newStream = globalStreamFactory.create(fraudStreamProperties, parseContext);
restartGlobalStream(globalStreams, newStream);
globalStreams = newStream;
return;
}
case CONCRETE: {
String localId = ruleTemplate.getLocalId();
KafkaStreams kafkaStreams = concreteStreams.get(localId);
FraudoParser.ParseContext parseContext = getParseContext(ruleTemplate.getTemplate());
KafkaStreams streams = concreteStreamFactory.create(fraudStreamProperties, parseContext);
restartGlobalStream(kafkaStreams, streams);
concreteStreams.put(localId, streams);
return;
}
default: {
log.warn("This template lvl={} is not supported!", ruleTemplate.getLvl());
}
}
}
private void restartGlobalStream(KafkaStreams kafkaStreamsOld, KafkaStreams newStream) {
if (kafkaStreamsOld != null && kafkaStreamsOld.state().isRunning()) {
kafkaStreamsOld.close();
}
while (true) {
if (kafkaStreamsOld == null || !kafkaStreamsOld.state().isRunning()) {
newStream.start();
return;
}
}
}
private FraudoParser.ParseContext getParseContext(String template) {
FraudoLexer lexer = new FraudoLexer(CharStreams.fromString(template));
FraudoParser parser = new FraudoParser(new CommonTokenStream(lexer));
return parser.parse();
}
}

View File

@ -0,0 +1,60 @@
package com.rbkmoney.fraudbusters.template;
import com.rbkmoney.fraudbusters.domain.RuleTemplate;
import com.rbkmoney.fraudbusters.serde.RuleTemplateDeserializer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@RequiredArgsConstructor
public class TemplateListener implements Runnable {
private static final String GROUP_ID = "TemplateListener-";
private AtomicBoolean stopped = new AtomicBoolean(false);
private final String listenTopic;
private final String bootstrapServers;
private final TemplateBroker templateBroker;
@Override
public void run() {
final Consumer<String, RuleTemplate> consumer = create(listenTopic);
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
final ConsumerRecords<String, RuleTemplate> consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> {
templateBroker.doDispatch(record.value());
log.info("apply template: {}", record);
});
}
consumer.close();
}
public void stop() {
stopped.set(true);
}
private Consumer<String, RuleTemplate> create(String topic) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, RuleTemplateDeserializer.class.getName());
final Consumer<String, RuleTemplate> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
return consumer;
}
}

View File

@ -0,0 +1,76 @@
package com.rbkmoney.fraudbusters.visitor;
import com.rbkmoney.fraudo.FraudoBaseVisitor;
import com.rbkmoney.fraudo.FraudoParser;
import com.rbkmoney.fraudo.constant.CheckedField;
import com.rbkmoney.fraudo.model.FraudModel;
import com.rbkmoney.fraudo.utils.TextUtil;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public class StreamAggregatorVisitor extends FraudoBaseVisitor<StreamsBuilder> {
private KafkaStreams streams = null;
private final StreamsBuilder builder = new StreamsBuilder();
private final String partyId;
@Override
public StreamsBuilder visitCount(FraudoParser.CountContext ctx) {
String countTarget = TextUtil.safeGetText(ctx.STRING());
String time = TextUtil.safeGetText(ctx.DECIMAL());
final KStream<String, FraudModel> views = builder.stream("test");
final KTable<Windowed<String>, Long> anomalousUsers = views
.filter((ignoredKey, model) -> partyId.equals(model.getPartyId()))
.map((ignoredKey, username) -> new KeyValue<>(username.getIp(), username.getIp()))
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(Long.getLong(time))))
.count();
final KStream<String, Long> anomalousUsersForConsole = anomalousUsers
.toStream()
.filter((windowedUserId, count) -> count != null)
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId.toString(), count));
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
anomalousUsersForConsole.to("AnomalousUsers", Produced.with(stringSerde, longSerde));
return builder;
}
@Override
public StreamsBuilder visitCount_success(FraudoParser.Count_successContext ctx) {
return super.visitCount_success(ctx);
}
@Override
public StreamsBuilder visitCount_error(FraudoParser.Count_errorContext ctx) {
return super.visitCount_error(ctx);
}
@Override
public StreamsBuilder visitSum(FraudoParser.SumContext ctx) {
return super.visitSum(ctx);
}
@Override
public StreamsBuilder visitSum_success(FraudoParser.Sum_successContext ctx) {
return super.visitSum_success(ctx);
}
@Override
public StreamsBuilder visitSum_error(FraudoParser.Sum_errorContext ctx) {
return super.visitSum_error(ctx);
}
}

View File

@ -0,0 +1,8 @@
server.port: @server.port@
kafka:
bootstrap.servers: "localhost:29092"
global.stream.topic: global_topic
concrete.stream.topic: concrete
result.stream.topic: result
template.topic: template_new_3

View File

@ -0,0 +1,181 @@
package com.rbkmoney.fraudbusters;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.*;
import org.testcontainers.containers.ClickHouseContainer;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Date;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@Slf4j
public class FraudBustersApplicationTest {
public static final String DEFAULT = "default";
private ClickHouseDataSource dataSource;
private ClickHouseConnection connection;
@Rule
public ClickHouseContainer clickhouse = new ClickHouseContainer();
@Test
public void testUpdateCountForSelect() throws Exception {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(clickhouse.getJdbcUrl());
hikariConfig.setUsername(clickhouse.getUsername());
hikariConfig.setPassword(clickhouse.getPassword());
HikariDataSource ds = new HikariDataSource(hikariConfig);
Statement statement = ds.getConnection().createStatement();
statement.executeUpdate("CREATE TABLE `ontime` (\n" +
" `Year` UInt16,\n" +
" `Quarter` UInt8,\n" +
" `Month` UInt8,\n" +
" `DayofMonth` UInt8,\n" +
" `DayOfWeek` UInt8,\n" +
" `FlightDate` Date,\n" +
" `UniqueCarrier` FixedString(7),\n" +
" `AirlineID` Int32,\n" +
" `Carrier` FixedString(2),\n" +
" `TailNum` String,\n" +
" `FlightNum` String,\n" +
" `OriginAirportID` Int32,\n" +
" `OriginAirportSeqID` Int32,\n" +
" `OriginCityMarketID` Int32,\n" +
" `Origin` FixedString(5),\n" +
" `OriginCityName` String,\n" +
" `OriginState` FixedString(2),\n" +
" `OriginStateFips` String,\n" +
" `OriginStateName` String,\n" +
" `OriginWac` Int32,\n" +
" `DestAirportID` Int32,\n" +
" `DestAirportSeqID` Int32,\n" +
" `DestCityMarketID` Int32,\n" +
" `Dest` FixedString(5),\n" +
" `DestCityName` String,\n" +
" `DestState` FixedString(2),\n" +
" `DestStateFips` String,\n" +
" `DestStateName` String,\n" +
" `DestWac` Int32,\n" +
" `CRSDepTime` Int32,\n" +
" `DepTime` Int32,\n" +
" `DepDelay` Int32,\n" +
" `DepDelayMinutes` Int32,\n" +
" `DepDel15` Int32,\n" +
" `DepartureDelayGroups` String,\n" +
" `DepTimeBlk` String,\n" +
" `TaxiOut` Int32,\n" +
" `WheelsOff` Int32,\n" +
" `WheelsOn` Int32,\n" +
" `TaxiIn` Int32,\n" +
" `CRSArrTime` Int32,\n" +
" `ArrTime` Int32,\n" +
" `ArrDelay` Int32,\n" +
" `ArrDelayMinutes` Int32,\n" +
" `ArrDel15` Int32,\n" +
" `ArrivalDelayGroups` Int32,\n" +
" `ArrTimeBlk` String,\n" +
" `Cancelled` UInt8,\n" +
" `CancellationCode` FixedString(1),\n" +
" `Diverted` UInt8,\n" +
" `CRSElapsedTime` Int32,\n" +
" `ActualElapsedTime` Int32,\n" +
" `AirTime` Int32,\n" +
" `Flights` Int32,\n" +
" `Distance` Int32,\n" +
" `DistanceGroup` UInt8,\n" +
" `CarrierDelay` Int32,\n" +
" `WeatherDelay` Int32,\n" +
" `NASDelay` Int32,\n" +
" `SecurityDelay` Int32,\n" +
" `LateAircraftDelay` Int32,\n" +
" `FirstDepTime` String,\n" +
" `TotalAddGTime` String,\n" +
" `LongestAddGTime` String,\n" +
" `DivAirportLandings` String,\n" +
" `DivReachedDest` String,\n" +
" `DivActualElapsedTime` String,\n" +
" `DivArrDelay` String,\n" +
" `DivDistance` String,\n" +
" `Div1Airport` String,\n" +
" `Div1AirportID` Int32,\n" +
" `Div1AirportSeqID` Int32,\n" +
" `Div1WheelsOn` String,\n" +
" `Div1TotalGTime` String,\n" +
" `Div1LongestGTime` String,\n" +
" `Div1WheelsOff` String,\n" +
" `Div1TailNum` String,\n" +
" `Div2Airport` String,\n" +
" `Div2AirportID` Int32,\n" +
" `Div2AirportSeqID` Int32,\n" +
" `Div2WheelsOn` String,\n" +
" `Div2TotalGTime` String,\n" +
" `Div2LongestGTime` String,\n" +
" `Div2WheelsOff` String,\n" +
" `Div2TailNum` String,\n" +
" `Div3Airport` String,\n" +
" `Div3AirportID` Int32,\n" +
" `Div3AirportSeqID` Int32,\n" +
" `Div3WheelsOn` String,\n" +
" `Div3TotalGTime` String,\n" +
" `Div3LongestGTime` String,\n" +
" `Div3WheelsOff` String,\n" +
" `Div3TailNum` String,\n" +
" `Div4Airport` String,\n" +
" `Div4AirportID` Int32,\n" +
" `Div4AirportSeqID` Int32,\n" +
" `Div4WheelsOn` String,\n" +
" `Div4TotalGTime` String,\n" +
" `Div4LongestGTime` String,\n" +
" `Div4WheelsOff` String,\n" +
" `Div4TailNum` String,\n" +
" `Div5Airport` String,\n" +
" `Div5AirportID` Int32,\n" +
" `Div5AirportSeqID` Int32,\n" +
" `Div5WheelsOn` String,\n" +
" `Div5TotalGTime` String,\n" +
" `Div5LongestGTime` String,\n" +
" `Div5WheelsOff` String,\n" +
" `Div5TailNum` String\n" +
") ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192)");
PreparedStatement prepareStatement = ds.getConnection().prepareStatement("INSERT INTO ontime [(Year, Month , FlightDate)] VALUES (?, ?, ?);");
prepareStatement.setInt(1, 1);
prepareStatement.setInt(2, 1);
prepareStatement.setDate(3, new java.sql.Date(1));
boolean execute = prepareStatement.execute();
statement = ds.getConnection().createStatement();
statement.execute("select avg(c1) from (select Year, Month, count(*) as c1 from ontime group by Year, Month);");
ResultSet resultSet = statement.getResultSet();
resultSet.next();
Date resultSetInt = resultSet.getDate(1);
assertEquals("A basic SELECT query succeeds", 1, resultSetInt);
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.fraudbusters;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaTest {
@Test
public void testUpdateCountForSelect() throws Exception {
// ReadOnlyWindowStore<String, Long> windowStore =
// streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
//
// WindowStoreIterator<Long> iterator = windowStore.fetch("world", Instant.ofEpochMilli(0), Instant.now());
// while (iterator.hasNext()) {
// KeyValue<Long, Long> next = iterator.next();
// long windowTimestamp = next.key;
// System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
// }
// iterator.close();
}
}

View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
<root level="warn">
<appender-ref ref="CONSOLE"/>
</root>
<logger name="com.rbkmoney.woody" level="ALL"/>
</configuration>