BJ-804: add service implementation (#1)

This commit is contained in:
Anatolii Karlov 2020-03-04 12:43:54 +03:00 committed by GitHub
parent 824ad9cb19
commit ea5db24ce0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1982 additions and 2 deletions

57
.gitignore vendored Normal file
View File

@ -0,0 +1,57 @@
# Created by .ignore support plugin (hsz.mobi)
.eunit
deps
*.o
*.beam
*.plt
erl_crash.dump
ebin/*.beam
rel/example_project
.concrete/DEV_MODE
.rebar
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
.idea/
*.iws
*.ipr
*.iml
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
*.class
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.ear
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
env.list

4
.gitmodules vendored Normal file
View File

@ -0,0 +1,4 @@
[submodule "build_utils"]
path = build_utils
url = git@github.com:rbkmoney/build_utils.git
branch = master

18
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,18 @@
#!groovy
build('cm-dudoser', 'java-maven') {
checkoutRepo()
loadBuildUtils()
def javaServicePipeline
runStage('load JavaService pipeline') {
javaServicePipeline = load("build_utils/jenkins_lib/pipeJavaService.groovy")
}
def serviceName = env.REPO_NAME
def mvnArgs = '-DjvmArgs="-Xmx256m"'
def useJava11 = true
def registry = 'dr2.rbkmoney.com'
def registryCredsId = 'jenkins_harbor'
javaServicePipeline(serviceName, useJava11, mvnArgs, registry, registryCredsId)
}

View File

@ -1,2 +1,2 @@
# cm-hooker
сервис отправки email-сообщений мерчантам при изменении статуса claim
# cm-dudoser
Сервис по отправки уведомлений на почту мерчанта по получаемым событиям в клейме

1
build_utils Submodule

@ -0,0 +1 @@
Subproject commit 8ad24ac7dc831a280cf8622086d73867d0a04b93

254
pom.xml Normal file
View File

@ -0,0 +1,254 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.rbkmoney</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
</parent>
<artifactId>cm-dudoser</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>cm-dudoser</name>
<description>claim-management-dudoser</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>11</java.version>
<server.port>8022</server.port>
<exposed.ports>${server.port}</exposed.ports>
<dockerfile.base.service.tag>bc95d0d6dc13c693acd2b274531a7d604b877bf3</dockerfile.base.service.tag>
<dockerfile.registry>${env.REGISTRY}</dockerfile.registry>
<shared.resources.version>0.3.6</shared.resources.version>
<sonar.jacoco.reportPaths>${project.basedir}/target/jacoco.exec</sonar.jacoco.reportPaths>
<kafka.clients.version>2.1.0</kafka.clients.version>
<kafka.common.lib.version>0.1.3</kafka.common.lib.version>
<geck.version>0.6.8</geck.version>
</properties>
<dependencies>
<!--rbkmoney-->
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>spring-boot-starter-metrics-statsd</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>1.1.15</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>damsel</artifactId>
<version>1.387-6a4cdd6</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>shared-resources</artifactId>
<version>${shared.resources.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>custom-actuator-endpoints</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.geck</groupId>
<artifactId>common</artifactId>
<version>${geck.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>messages-proto</artifactId>
<version>1.10-391d225</version>
</dependency>
<!--spring-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!--third party-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-tools</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>kafka-common-lib</artifactId>
<version>${kafka.common.lib.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.8.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.8.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.12.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail</artifactId>
<version>1.5.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>easyway</artifactId>
<version>0.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.build.directory}/maven-shared-archive-resources</directory>
<targetPath>${project.build.directory}</targetPath>
<includes>
<include>Dockerfile</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>${project.build.directory}/maven-shared-archive-resources</directory>
<filtering>true</filtering>
<excludes>
<exclude>Dockerfile</exclude>
</excludes>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.1.RELEASE</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-remote-resources-plugin</artifactId>
<version>1.6.0</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.shared</groupId>
<artifactId>maven-filtering</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
<configuration>
<resourceBundles>
<resourceBundle>com.rbkmoney:shared-resources:${shared.resources.version}</resourceBundle>
</resourceBundles>
<attachToMain>false</attachToMain>
<attachToTest>false</attachToTest>
</configuration>
<executions>
<execution>
<goals>
<goal>process</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version>
<configuration>
<destFile>${sonar.jacoco.reportPaths}</destFile>
<append>true</append>
</configuration>
<executions>
<execution>
<id>agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,14 @@
package com.rbkmoney.cm.dudoser;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@ServletComponentScan
@SpringBootApplication
public class CMDudoserApplication extends SpringApplication {
public static void main(String[] args) {
SpringApplication.run(CMDudoserApplication.class, args);
}
}

View File

@ -0,0 +1,35 @@
package com.rbkmoney.cm.dudoser.config;
import com.rbkmoney.damsel.claim_management.ClaimManagementSrv;
import com.rbkmoney.damsel.messages.MessageServiceSrv;
import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import java.io.IOException;
@Slf4j
@Configuration
public class ClientConfig {
@Bean
public ClaimManagementSrv.Iface claimManagementClient(@Value("${claimmanagement.client.adapter.url}") Resource resource,
@Value("${claimmanagement.client.adapter.networkTimeout}") int timeout) throws IOException {
return new THSpawnClientBuilder()
.withAddress(resource.getURI())
.withNetworkTimeout(timeout)
.build(ClaimManagementSrv.Iface.class);
}
@Bean
public MessageServiceSrv.Iface messageServiceClient(@Value("${conversations.client.adapter.url}") Resource resource,
@Value("${conversations.client.adapter.networkTimeout}") int timeout) throws IOException {
return new THSpawnClientBuilder()
.withAddress(resource.getURI())
.withNetworkTimeout(timeout)
.build(MessageServiceSrv.Iface.class);
}
}

View File

@ -0,0 +1,99 @@
package com.rbkmoney.cm.dudoser.config;
import com.rbkmoney.cm.dudoser.config.properties.KafkaConsumerProperties;
import com.rbkmoney.cm.dudoser.config.properties.KafkaSslProperties;
import com.rbkmoney.cm.dudoser.deserializer.ClaimEventSinkDeserializer;
import com.rbkmoney.damsel.claim_management.Event;
import com.rbkmoney.kafka.common.exception.handler.SeekToCurrentWithSleepErrorHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableConfigurationProperties({KafkaSslProperties.class, KafkaConsumerProperties.class})
@RequiredArgsConstructor
public class KafkaConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.error-handler.sleep-time-seconds}")
private int errorHandlerSleepTimeSeconds;
@Value("${kafka.error-handler.maxAttempts}")
private int errorHandlerMaxAttempts;
@Bean
public Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties,
KafkaConsumerProperties kafkaConsumerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClaimEventSinkDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaConsumerProperties.getClientId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProperties.isEnableAutoCommit());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProperties.getAutoOffsetReset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerProperties.getMaxPollRecords());
props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, kafkaConsumerProperties.getConnectionsMaxIdleMs());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerProperties.getSessionTimeoutMs());
configureSsl(props, kafkaSslProperties);
return props;
}
@Bean
public ConsumerFactory<String, Event> consumerFactory(KafkaSslProperties kafkaSslProperties,
KafkaConsumerProperties kafkaConsumerProperties) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties, kafkaConsumerProperties));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> kafkaListenerContainerFactory(ConsumerFactory<String, Event> consumerFactory,
KafkaConsumerProperties kafkaConsumerProperties) {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setErrorHandler(kafkaErrorHandler());
factory.setConcurrency(kafkaConsumerProperties.getConcurrency());
return factory;
}
private void configureSsl(Map<String, Object> props, KafkaSslProperties kafkaSslProperties) {
if (kafkaSslProperties.isEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
}
private ErrorHandler kafkaErrorHandler() {
return new SeekToCurrentWithSleepErrorHandler(errorHandlerSleepTimeSeconds, errorHandlerMaxAttempts);
}
}

View File

@ -0,0 +1,19 @@
package com.rbkmoney.cm.dudoser.config;
import com.rbkmoney.cm.dudoser.handler.ClaimHandler;
import com.rbkmoney.cm.dudoser.listener.ClaimEventSinkListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@Configuration
public class KafkaConsumerBeanEnableConfig {
@Bean
@ConditionalOnProperty(value = "kafka.topics.claim-event-sink.enabled", havingValue = "true")
public ClaimEventSinkListener paymentEventsKafkaListener(ClaimHandler claimHandler) {
return new ClaimEventSinkListener(claimHandler);
}
}

View File

@ -0,0 +1,48 @@
package com.rbkmoney.cm.dudoser.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import java.util.Properties;
@Configuration
public class MailConfig {
@Bean
public Properties mailProperties(@Value("${mail.protocol}") String protocol,
@Value("${mail.smtp.auth}") boolean smtpsAuth,
@Value("${mail.smtp.starttls.enable}") boolean starttls,
@Value("${mail.smtp.timeout}") int timeout,
@Value("${mail.host}") String host,
@Value("${mail.port}") int port,
@Value("${mail.username}") String username) {
Properties properties = new Properties();
properties.put("mail.smtp.auth", smtpsAuth);
properties.put("mail.smtp.starttls.enable", starttls);
properties.put("mail.smtp.connectiontimeout", timeout);
properties.put("mail.smtp.timeout", timeout);
properties.put("mail.transport.protocol", protocol);
properties.put("mail.smtp.host", host);
properties.put("mail.smtp.port", port);
properties.put("mail.username", username);
return properties;
}
@Bean
public JavaMailSender javaMailSender(Properties mailProperties,
@Value("${mail.host}") String host,
@Value("${mail.port}") int port,
@Value("${mail.username}") String username,
@Value("${mail.password}") String password) {
JavaMailSenderImpl sender = new JavaMailSenderImpl();
sender.setHost(host);
sender.setPort(port);
sender.setUsername(username);
sender.setPassword(password);
sender.setJavaMailProperties(mailProperties);
return sender;
}
}

View File

@ -0,0 +1,36 @@
package com.rbkmoney.cm.dudoser.config;
import com.rbkmoney.cm.dudoser.exception.MailSendException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.Collections;
@EnableRetry
@Configuration
public class RetryConfig {
@Value("${mail.retry.backoff.period:1000}")
private long backOffPeriod;
@Value("${mail.retry.max.attempts:3}")
private int maxAttempts;
@Bean
public RetryTemplate mailRetryTemplate() {
FixedBackOffPolicy policy = new FixedBackOffPolicy();
policy.setBackOffPeriod(backOffPeriod);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(
new SimpleRetryPolicy(maxAttempts, Collections.singletonMap(MailSendException.class, true))
);
retryTemplate.setBackOffPolicy(policy);
return retryTemplate;
}
}

View File

@ -0,0 +1,26 @@
package com.rbkmoney.cm.dudoser.config;
import org.apache.velocity.app.VelocityEngine;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TemplateConfig {
@Bean
public VelocityEngine claimManagementTemplateEngine() {
String resourceLoader = "org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader";
VelocityEngine engine = getVelocityEngine(resourceLoader);
engine.init();
return engine;
}
private VelocityEngine getVelocityEngine(String resourceLoader) {
VelocityEngine engine = new VelocityEngine();
engine.setProperty("resource.loader", "class");
engine.setProperty("class.resource.loader.class", resourceLoader);
return engine;
}
}

View File

@ -0,0 +1,25 @@
package com.rbkmoney.cm.dudoser.config.properties;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@ToString
@Component
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerProperties {
private String autoOffsetReset;
private boolean enableAutoCommit;
private String clientId;
private String groupId;
private Integer maxPollRecords;
private Integer connectionsMaxIdleMs;
private Integer sessionTimeoutMs;
private Integer concurrency;
}

View File

@ -0,0 +1,23 @@
package com.rbkmoney.cm.dudoser.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String trustStorePassword;
private String trustStoreLocation;
private String keyStorePassword;
private String keyPassword;
private String keyStoreLocation;
private boolean enabled;
private String keyStoreType;
private String trustStoreType;
}

View File

@ -0,0 +1,13 @@
package com.rbkmoney.cm.dudoser.deserializer;
import com.rbkmoney.damsel.claim_management.Event;
import com.rbkmoney.kafka.common.serialization.AbstractThriftDeserializer;
public class ClaimEventSinkDeserializer extends AbstractThriftDeserializer<Event> {
@Override
public Event deserialize(String s, byte[] bytes) {
return super.deserialize(bytes, new Event());
}
}

View File

@ -0,0 +1,15 @@
package com.rbkmoney.cm.dudoser.domain;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class ClaimData {
private TemplateType templateType;
private String id;
private String status;
private String comment;
}

View File

@ -0,0 +1,19 @@
package com.rbkmoney.cm.dudoser.domain;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
@Getter
public enum ClaimStatus {
PENDING("В ожидании"),
REVIEW("На рассмотрении"),
PENDING_ACCEPTANCE("В ожидании подтверждения"),
ACCEPTED("Подтверждена"),
DENIED("Отклонена"),
REVOKED("Отозвана");
private final String cyrillicValue;
}

View File

@ -0,0 +1,18 @@
package com.rbkmoney.cm.dudoser.domain;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class Message {
private String from;
private String to;
private String subject;
private String content;
private String partyId;
private long claimId;
}

View File

@ -0,0 +1,8 @@
package com.rbkmoney.cm.dudoser.domain;
public enum TemplateType {
STATUSCHANGE,
COMMENT
}

View File

@ -0,0 +1,23 @@
package com.rbkmoney.cm.dudoser.exception;
public class MailSendException extends RuntimeException {
public MailSendException() {
}
public MailSendException(String message) {
super(message);
}
public MailSendException(String message, Throwable cause) {
super(message, cause);
}
public MailSendException(Throwable cause) {
super(cause);
}
public MailSendException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,23 @@
package com.rbkmoney.cm.dudoser.exception;
public class NotFoundException extends RuntimeException {
public NotFoundException() {
}
public NotFoundException(String message) {
super(message);
}
public NotFoundException(String message, Throwable cause) {
super(message, cause);
}
public NotFoundException(Throwable cause) {
super(cause);
}
public NotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,23 @@
package com.rbkmoney.cm.dudoser.exception;
public class ThriftClientException extends RuntimeException {
public ThriftClientException() {
}
public ThriftClientException(String message) {
super(message);
}
public ThriftClientException(String message, Throwable cause) {
super(message, cause);
}
public ThriftClientException(Throwable cause) {
super(cause);
}
public ThriftClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,83 @@
package com.rbkmoney.cm.dudoser.handler;
import com.rbkmoney.cm.dudoser.domain.Message;
import com.rbkmoney.cm.dudoser.service.MessageBuilderService;
import com.rbkmoney.cm.dudoser.service.RetryableSenderService;
import com.rbkmoney.damsel.claim_management.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
@Slf4j
public class ClaimHandler {
private final MessageBuilderService<ClaimStatusChanged> statusMessageBuilder;
private final MessageBuilderService<CommentModificationUnit> commentChangeMessageBuilder;
private final RetryableSenderService retryableSenderService;
public void handle(Event event) {
try {
handleEvent(event);
} catch (Exception ex) {
log.error("Some problem when handling", ex);
throw ex;
}
}
private void handleEvent(Event event) {
Change change = event.getChange();
if (change.isSetStatusChanged()) {
ClaimStatusChanged claimStatusChanged = change.getStatusChanged();
String partyId = claimStatusChanged.getPartyId();
long claimId = claimStatusChanged.getId();
log.info("Handle status change event with id {} for party {} get started", claimId, partyId);
Message message = statusMessageBuilder.build(claimStatusChanged, partyId, claimId);
retryableSenderService.sendToMail(message);
log.info("Handle status change event with party id '{}' and claim id '{}' finished", partyId, claimId);
} else if (containsCommentModifications(change)) {
ClaimUpdated claimUpdated = change.getUpdated();
String partyId = claimUpdated.getPartyId();
long claimId = claimUpdated.getId();
List<CommentModificationUnit> commentModifications = change.getUpdated().getChangeset().stream()
.filter(Modification::isSetClaimModification)
.map(Modification::getClaimModification)
.filter(ClaimModification::isSetCommentModification)
.map(ClaimModification::getCommentModification)
.collect(Collectors.toList());
for (CommentModificationUnit commentModification : commentModifications) {
log.info("Handle comment modification update change event get started, claimId={}, partyId={}, commentId={}", claimId, partyId, commentModification.getId());
Message message = commentChangeMessageBuilder.build(commentModification, partyId, claimId);
retryableSenderService.sendToMail(message);
log.info("Handle comment modification update change event finished, claimId={}, partyId={}, commentId={}", claimId, partyId, commentModification.getId());
}
}
}
private boolean containsCommentModifications(Change change) {
if (change.isSetUpdated()) {
return change.getUpdated().getChangeset().stream()
.filter(Modification::isSetClaimModification)
.map(Modification::getClaimModification)
.anyMatch(ClaimModification::isSetCommentModification);
}
return false;
}
}

View File

@ -0,0 +1,38 @@
package com.rbkmoney.cm.dudoser.listener;
import com.rbkmoney.cm.dudoser.handler.ClaimHandler;
import com.rbkmoney.damsel.claim_management.Event;
import com.rbkmoney.damsel.claim_management.InternalUser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
@Slf4j
@RequiredArgsConstructor
public class ClaimEventSinkListener {
private final ClaimHandler claimHandler;
@KafkaListener(topics = "${kafka.topics.claim-event-sink.id}", containerFactory = "kafkaListenerContainerFactory")
public void handle(Event event, Acknowledgment ack) throws TException {
log.info("Handle claim management Event get started, occuredAt={}", event.getOccuredAt());
if (event.getUserInfo() != null && isInternalUser(event)) {
claimHandler.handle(event);
}
log.info("Handle claim management Event finished, occuredAt={}", event.getOccuredAt());
ack.acknowledge();
}
private boolean isInternalUser(Event event) {
return event.getUserInfo().getType().equals(internalUser());
}
private com.rbkmoney.damsel.claim_management.UserType internalUser() {
return com.rbkmoney.damsel.claim_management.UserType.internal_user(new InternalUser());
}
}

View File

@ -0,0 +1,64 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.exception.NotFoundException;
import com.rbkmoney.cm.dudoser.exception.ThriftClientException;
import com.rbkmoney.damsel.claim_management.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
@RequiredArgsConstructor
@Slf4j
public class ClaimService {
private final ClaimManagementSrv.Iface claimManagementClient;
public String getEmailByClaim(String partyId, long claimId) {
Claim claim = getClaim(partyId, claimId);
String emailTo = claim.getChangeset().stream()
.filter(this::isExternalUser)
.findFirst()
.map(ModificationUnit::getUserInfo)
.map(UserInfo::getEmail)
.orElse(null);
if (emailTo == null) {
throw new NotFoundException(String.format("ExternalUser info from Claim can not be null, partyId=%s, claimId=%s", partyId, claimId));
}
return emailTo;
}
private Claim getClaim(String partyId, long claimId) {
try {
log.info("Trying to get Claim from thrift client, partyId={}, claimId={}", partyId, claimId);
Claim claim = claimManagementClient.getClaim(partyId, claimId);
if (claim == null || claim.getChangeset() == null || claim.getChangeset().isEmpty()) {
throw new NotFoundException(String.format("Changeset from Claim can not be null, partyId=%s, claimId=%s", partyId, claimId));
}
return claim;
} catch (TException ex) {
throw new ThriftClientException(String.format("Failed to get Claim from thrift client, partyId=%s, claimId=%s", partyId, claimId), ex);
}
}
private boolean isExternalUser(ModificationUnit modificationUnit) {
return Optional.ofNullable(modificationUnit)
.map(ModificationUnit::getUserInfo)
.map(UserInfo::getType)
.map(userType -> userType.equals(externalUser()))
.orElse(false);
}
private com.rbkmoney.damsel.claim_management.UserType externalUser() {
return com.rbkmoney.damsel.claim_management.UserType.external_user(new ExternalUser());
}
}

View File

@ -0,0 +1,38 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.exception.NotFoundException;
import com.rbkmoney.cm.dudoser.exception.ThriftClientException;
import com.rbkmoney.damsel.messages.Conversation;
import com.rbkmoney.damsel.messages.ConversationFilter;
import com.rbkmoney.damsel.messages.GetConversationResponse;
import com.rbkmoney.damsel.messages.MessageServiceSrv;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@RequiredArgsConstructor
@Slf4j
public class ConversationService {
private final MessageServiceSrv.Iface messageServiceClient;
public Conversation getConversation(String conversationId) {
try {
log.info("Trying to get Conversation from thrift client, conversationId={}", conversationId);
GetConversationResponse response = messageServiceClient.getConversations(List.of(conversationId), new ConversationFilter());
if (response == null || response.getConversations() == null || response.getConversations().size() != 1) {
throw new NotFoundException(String.format("Conversation's size must be = 1, conversationId=%s", conversationId));
}
return response.getConversations().get(0);
} catch (TException ex) {
throw new ThriftClientException(String.format("Failed to get Conversation from thrift client, conversationId=%s", conversationId), ex);
}
}
}

View File

@ -0,0 +1,47 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.domain.Message;
import com.rbkmoney.cm.dudoser.exception.MailSendException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.MailException;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Service;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.nio.charset.StandardCharsets;
@Slf4j
@Service
@RequiredArgsConstructor
public class MailSenderService {
private final JavaMailSender mailSender;
public boolean send(Message message) {
try {
log.info("Trying to send message to mail, partyId={}, claimId={}, email={}", message.getPartyId(), message.getClaimId(), message.getTo());
MimeMessage mimeMessage = getMimeMessage(message);
mailSender.send(mimeMessage);
return true;
} catch (MessagingException | MailException ex) {
throw new MailSendException(String.format("Received exception while sending message to mail, partyId=%s, claimId=%s, email=%s", message.getPartyId(), message.getClaimId(), message.getTo()), ex);
}
}
private MimeMessage getMimeMessage(Message message) throws MessagingException {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true, StandardCharsets.UTF_8.name());
helper.setFrom(message.getFrom());
helper.setTo(message.getTo());
helper.setSubject(message.getSubject());
helper.setText(message.getContent(), false);
return mimeMessage;
}
}

View File

@ -0,0 +1,9 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.domain.Message;
public interface MessageBuilderService<T> {
Message build(T change, String partyId, long claimId);
}

View File

@ -0,0 +1,20 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.domain.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class RetryableSenderService {
private final MailSenderService mailSenderService;
private final RetryTemplate mailRetryTemplate;
public void sendToMail(Message message) {
mailRetryTemplate.execute(context -> mailSenderService.send(message));
}
}

View File

@ -0,0 +1,48 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.domain.ClaimData;
import com.rbkmoney.cm.dudoser.exception.NotFoundException;
import lombok.RequiredArgsConstructor;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;
import org.springframework.stereotype.Component;
import java.io.StringWriter;
@Component
@RequiredArgsConstructor
public class TemplateService {
private static final String TITLE = "claimManagementData";
private static final String STATUS_CHANGED_TEMPLATE = "vm/StatusChangedEntity.vm";
private static final String COMMENT_CHANGED_TEMPLATE = "vm/CommentChangedEntity.vm";
private final VelocityEngine claimManagementTemplateEngine;
public String process(ClaimData data) {
VelocityContext headerContext = new VelocityContext();
headerContext.put(TITLE, data);
Template template = buildTemplate(data);
return build(template, headerContext);
}
private Template buildTemplate(ClaimData data) {
switch (data.getTemplateType()) {
case STATUSCHANGE:
return claimManagementTemplateEngine.getTemplate(STATUS_CHANGED_TEMPLATE);
case COMMENT:
return claimManagementTemplateEngine.getTemplate(COMMENT_CHANGED_TEMPLATE);
default:
throw new NotFoundException("templateType not found");
}
}
private String build(Template template, VelocityContext context) {
StringWriter writer = new StringWriter();
template.merge(context, writer);
return writer.toString();
}
}

View File

@ -0,0 +1,34 @@
package com.rbkmoney.cm.dudoser.service.impl;
import com.rbkmoney.cm.dudoser.domain.Message;
import com.rbkmoney.cm.dudoser.service.ClaimService;
import com.rbkmoney.cm.dudoser.service.MessageBuilderService;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public abstract class AbstractMessageBuilderService<T> implements MessageBuilderService<T> {
private final ClaimService claimService;
private final String emailFrom;
private final String subject;
@Override
public Message build(T change, String partyId, long claimId) {
String emailTo = claimService.getEmailByClaim(partyId, claimId);
return build(emailFrom, emailTo, getContent(change, claimId), subject, partyId, claimId);
}
protected abstract String getContent(T change, long claimId);
private Message build(String emailFrom, String emailTo, String content, String subject, String partyId, long claimId) {
return Message.builder()
.from(emailFrom)
.to(emailTo)
.subject(subject)
.content(content)
.partyId(partyId)
.claimId(claimId)
.build();
}
}

View File

@ -0,0 +1,44 @@
package com.rbkmoney.cm.dudoser.service.impl;
import com.rbkmoney.cm.dudoser.domain.ClaimData;
import com.rbkmoney.cm.dudoser.domain.TemplateType;
import com.rbkmoney.cm.dudoser.service.ClaimService;
import com.rbkmoney.cm.dudoser.service.ConversationService;
import com.rbkmoney.cm.dudoser.service.TemplateService;
import com.rbkmoney.damsel.claim_management.CommentModificationUnit;
import com.rbkmoney.damsel.messages.Conversation;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@Qualifier("commentChangeMessageBuilder")
public class CommentChangeMessageBuilderServiceImpl extends AbstractMessageBuilderService<CommentModificationUnit> {
private final ConversationService conversationService;
private final TemplateService templateService;
public CommentChangeMessageBuilderServiceImpl(ClaimService claimService,
@Value("${mail.from}") String emailFrom,
@Value("${mail.subject.comment}") String subject,
ConversationService conversationService,
TemplateService templateService) {
super(claimService, emailFrom, subject);
this.conversationService = conversationService;
this.templateService = templateService;
}
protected String getContent(CommentModificationUnit commentModification, long claimId) {
Conversation conversation = conversationService.getConversation(commentModification.getId());
com.rbkmoney.damsel.messages.Message message = conversation.getMessages().get(conversation.getMessages().size() - 1);
ClaimData claimData = ClaimData.builder()
.templateType(TemplateType.COMMENT)
.id(String.valueOf(claimId))
.comment(message.getText())
.build();
return templateService.process(claimData);
}
}

View File

@ -0,0 +1,42 @@
package com.rbkmoney.cm.dudoser.service.impl;
import com.rbkmoney.cm.dudoser.domain.ClaimData;
import com.rbkmoney.cm.dudoser.domain.TemplateType;
import com.rbkmoney.cm.dudoser.service.ClaimService;
import com.rbkmoney.cm.dudoser.service.TemplateService;
import com.rbkmoney.damsel.claim_management.ClaimStatus;
import com.rbkmoney.damsel.claim_management.ClaimStatusChanged;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import static com.rbkmoney.cm.dudoser.domain.ClaimStatus.valueOf;
@Service
@Qualifier("statusMessageBuilder")
public class StatusChangeMessageBuilderServiceImpl extends AbstractMessageBuilderService<ClaimStatusChanged> {
private final TemplateService templateService;
public StatusChangeMessageBuilderServiceImpl(ClaimService claimService,
@Value("${mail.from}") String emailFrom,
@Value("${mail.subject.comment}") String subject,
TemplateService templateService) {
super(claimService, emailFrom, subject);
this.templateService = templateService;
}
protected String getContent(ClaimStatusChanged claimStatusChanged, long claimId) {
ClaimData claimData = ClaimData.builder()
.templateType(TemplateType.STATUSCHANGE)
.id(String.valueOf(claimId))
.status(convertStatus(claimStatusChanged.getStatus()))
.build();
return templateService.process(claimData);
}
private String convertStatus(ClaimStatus tClaimStatus) {
return valueOf(tClaimStatus.getSetField().getFieldName().toUpperCase()).getCyrillicValue();
}
}

View File

@ -0,0 +1,80 @@
server:
port: @server.port@
management:
security:
flag: false
metrics:
export:
statsd:
flavor: ETSY
enabled: false
endpoint:
health:
show-details: ALWAYS
spring:
application:
name: @project.name@
output:
ansi:
enabled: ALWAYS
info:
version: @project.version@
stage: dev
kafka:
bootstrap:
servers: "localhost:9092"
topics:
claim-event-sink:
id: "claim-event-sink"
enabled: false
ssl:
enabled: false
trust-store-location: "test"
trust-store-password: "test"
key-store-location: "test"
key-store-password: "test"
key-password: "test"
key-store-type: PKCS12
trust-store-type: PKCS12
consumer:
concurrency: 5
client-id: claim-management
group-id: claim-management-group-1
enable-auto-commit: false
auto-offset-reset: latest
max-poll-records: 20
connections-max-idle-ms: 300000
session-timeout-ms: 300000
error-handler:
sleep-time-seconds: 5
maxAttempts: -1
mail:
host: mr1.linode.rbkmoney.net
port: 25
username: ""
password: ""
from: no-reply@rbkmoney.com
protocol: smtp
retry:
max:
attempts: 3
backoff:
period: 1000
subject:
status: "Изменение статуса вашей заявки на подключение к RBK.money"
comment: "Добавлен новый комментарий по вашей заявки на подключение к RBK.money"
smtp:
auth: false
timeout: 30000
starttls:
enable: true
claimmanagement:
client:
adapter:
url: http://localhost:8022/v1/claimmanagement
networkTimeout: 30000
conversations:
client:
adapter:
url: http://localhost:8022/v1/conversations
networkTimeout: 30000

View File

@ -0,0 +1,3 @@
## @vtlvariable name="claimManagementData" type="com.rbkmoney.cm.dudoser.domain.ClaimData"
Добрый день. У нас для вас новости. По вашей заявке "$!claimManagementData.id" поступил ответ "$!claimManagementData.comment"
Всегда на связи, ваш RBK.money!

View File

@ -0,0 +1,3 @@
## @vtlvariable name="claimManagementData" type="com.rbkmoney.cm.dudoser.domain.ClaimData"
Добрый день. У нас для вас новости. Статус вашей заявки "$!claimManagementData.id" изменен на '$!claimManagementData.status".
Хорошего дня! Всегда на связи, ваш RBK.money!

View File

@ -0,0 +1,76 @@
package com.rbkmoney.cm.dudoser.config;
import com.rbkmoney.cm.dudoser.CMDudoserApplication;
import com.rbkmoney.easyway.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.ClassRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.ConfigFileApplicationContextInitializer;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.FailureDetectingExternalResource;
import java.util.function.Consumer;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
@ContextConfiguration(initializers = AbstractKafkaConfig.Initializer.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@Slf4j
public abstract class AbstractKafkaConfig extends AbstractTestUtils {
private static TestContainers testContainers = TestContainersBuilder.builderWithTestContainers(TestContainersParameters::new)
.addKafkaTestContainer()
.build();
@ClassRule
public static final FailureDetectingExternalResource resource = new FailureDetectingExternalResource() {
@Override
protected void starting(Description description) {
testContainers.startTestContainers();
}
@Override
protected void failed(Throwable e, Description description) {
log.warn("Test Container running was failed ", e);
}
@Override
protected void finished(Description description) {
testContainers.stopTestContainers();
}
};
public static class Initializer extends ConfigFileApplicationContextInitializer {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
super.initialize(configurableApplicationContext);
TestPropertyValues.of(
testContainers.getEnvironmentProperties(
getEnvironmentPropertiesConsumer()
)
)
.applyTo(configurableApplicationContext);
}
}
private static Consumer<EnvironmentProperties> getEnvironmentPropertiesConsumer() {
return environmentProperties -> {
environmentProperties.put("kafka.topics.claim-event-sink.enabled", "true");
environmentProperties.put("kafka.error-handler.sleep-time-seconds", "1");
testContainers.getKafkaTestContainer().ifPresent(
c -> environmentProperties.put("kafka.bootstrap.servers", c.getBootstrapServers())
);
};
}
}

View File

@ -0,0 +1,147 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.config.AbstractKafkaConfig;
import com.rbkmoney.cm.dudoser.domain.Message;
import com.rbkmoney.cm.dudoser.exception.MailSendException;
import com.rbkmoney.damsel.claim_management.*;
import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static io.github.benas.randombeans.api.EnhancedRandom.random;
import static org.mockito.Mockito.*;
@Slf4j
public class ConsumerTests extends AbstractKafkaConfig {
@Value("${kafka.topics.claim-event-sink.id}")
public String topic;
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@MockBean
private RetryableSenderService retryableSenderService;
@MockBean
private MessageBuilderService<ClaimStatusChanged> statusMessageBuilder;
@Test
public void correctFlowTest() {
ClaimStatusChanged claimStatusChanged = getClaimStatusChanged();
Event event = new Event();
event.setUserInfo(getUserInfo());
event.setOccuredAt(LocalDateTime.now().toString());
event.setChange(Change.status_changed(claimStatusChanged));
when(statusMessageBuilder.build(eq(claimStatusChanged), anyString(), anyLong())).thenReturn(Message.builder().build());
doNothing().when(retryableSenderService).sendToMail(any());
try {
DefaultKafkaProducerFactory<String, Event> producerFactory = createProducerFactory();
KafkaTemplate<String, Event> kafkaTemplate = new KafkaTemplate<>(producerFactory);
TimeUnit.SECONDS.sleep(1);
kafkaTemplate.send(
topic,
random(String.class),
event
)
.get();
TimeUnit.SECONDS.sleep(1);
} catch (ExecutionException | InterruptedException ex) {
ex.printStackTrace();
}
verify(statusMessageBuilder, times(1)).build(eq(claimStatusChanged), anyString(), anyLong());
// проверяем что ивент из кафки был корректно отправлен сообщением на почтовый сервер
verify(retryableSenderService, times(1)).sendToMail(any());
}
@Test
public void errorRetryTest() {
int countRetries = 5;
AtomicInteger atomicInt = new AtomicInteger(0);
ClaimStatusChanged claimStatusChanged = getClaimStatusChanged();
Event event = new Event();
event.setUserInfo(getUserInfo());
event.setOccuredAt(LocalDateTime.now().toString());
event.setChange(Change.status_changed(claimStatusChanged));
when(statusMessageBuilder.build(eq(claimStatusChanged), anyString(), anyLong())).thenReturn(Message.builder().build());
// на countRetries попытке мок корректно обработает вызов, в предыдущих попытках будет вынуждать кафку ретраить обработку ивента
doAnswer(
invocation -> {
int increment = atomicInt.getAndIncrement();
if (increment < countRetries - 1) {
throw new MailSendException();
}
return true;
}
).when(retryableSenderService).sendToMail(any());
try {
DefaultKafkaProducerFactory<String, Event> producerFactory = createProducerFactory();
KafkaTemplate<String, Event> kafkaTemplate = new KafkaTemplate<>(producerFactory);
TimeUnit.SECONDS.sleep(1);
kafkaTemplate.send(
topic,
random(String.class),
event
)
.get();
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
verify(statusMessageBuilder, times(countRetries)).build(eq(claimStatusChanged), anyString(), anyLong());
// проверяем, что кафка пыталась обработать ивент такое количество раз, которое задано моку
verify(retryableSenderService, times(countRetries)).sendToMail(any());
}
private UserInfo getUserInfo() {
UserInfo userInfo = new UserInfo();
userInfo.setType(UserType.internal_user(new InternalUser()));
userInfo.setId(UUID.randomUUID().toString());
userInfo.setUsername("asd");
userInfo.setEmail("asd@sad.com");
return userInfo;
}
private ClaimStatusChanged getClaimStatusChanged() {
return new ClaimStatusChanged(UUID.randomUUID().toString(), 1, ClaimStatus.pending(new ClaimPending()), 1, LocalDateTime.now().toString());
}
private <T> DefaultKafkaProducerFactory<String, T> createProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client_id");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ThriftSerializer<Event>().getClass().getName());
return new DefaultKafkaProducerFactory<>(props);
}
}

View File

@ -0,0 +1,200 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.CMDudoserApplication;
import com.rbkmoney.cm.dudoser.handler.ClaimHandler;
import com.rbkmoney.cm.dudoser.listener.ClaimEventSinkListener;
import com.rbkmoney.damsel.claim_management.*;
import com.rbkmoney.damsel.messages.Conversation;
import com.rbkmoney.damsel.messages.Message;
import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import static io.github.benas.randombeans.api.EnhancedRandom.random;
import static io.github.benas.randombeans.api.EnhancedRandom.randomListOf;
import static org.mockito.Mockito.*;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
public class ListenerTest {
@MockBean
private RetryableSenderService retryableSenderService;
@MockBean
private ClaimService claimService;
@MockBean
private ConversationService conversationService;
@Autowired
private ClaimHandler claimHandler;
private String email = "no-reply@rbk.com";
private ClaimEventSinkListener listener;
@Before
public void setUp() {
doNothing().when(retryableSenderService).sendToMail(any());
when(conversationService.getConversation(anyString())).thenReturn(getConversation());
when(claimService.getEmailByClaim(anyString(), anyLong())).thenReturn(email);
listener = new ClaimEventSinkListener(claimHandler);
}
@Test
public void testExternalUser() throws Exception {
//если тип пользователя External, то сервис пропускает любой тип получаемого change
sendMail(listener, getEvent(getExternalUser(), getClaimCreated()), 0);
sendMail(listener, getEvent(getExternalUser(), getClaimStatus(getClaimPending())), 0);
sendMail(listener, getEvent(getExternalUser(), getClaimUpdated(getClaimModification(getCommentModification()))), 0);
}
@Test
public void testInternalUser() throws Exception {
//change типа created пропускается
sendMail(listener, getEvent(getInternalUser(), getClaimCreated()), 0);
sendMail(listener, getEvent(getInternalUser(), getClaimStatus(getClaimAccepted())), 1);
sendMail(listener, getEvent(getInternalUser(), getClaimStatus(getClaimDenied())), 1);
sendMail(listener, getEvent(getInternalUser(), getClaimStatus(getClaimPending())), 1);
//modification типа НЕ comment пропускается
sendMail(listener, getEvent(getInternalUser(), getClaimUpdated(getClaimModification(getNotCommentModification()))), 0);
sendMail(listener, getEvent(getInternalUser(), getClaimUpdated(getClaimModification(getCommentModification()))), 1);
Change claimUpdated = getClaimUpdated(
getClaimModification(getCommentModification()),
getClaimModification(getNotCommentModification()),
getClaimModification(getCommentModification())
);
sendMail(listener, getEvent(getInternalUser(), claimUpdated), 2);
}
private void sendMail(ClaimEventSinkListener listener, Event event, int timesSending) throws TException {
reset(retryableSenderService);
listener.handle(event, () -> {
});
verify(retryableSenderService, times(timesSending)).sendToMail(any());
}
private Event getEvent(UserType userType, Change change) {
Event event = random(Event.class, "change", "user_info");
event.setUserInfo(getUserInfo(userType));
event.setChange(change);
return event;
}
private Change getClaimCreated() {
ClaimCreated changed = random(ClaimCreated.class, "changeset");
changed.setChangeset(List.of(getClaimModification(getCommentModification())));
return Change.created(changed);
}
private Change getClaimUpdated(Modification... modifications) {
ClaimUpdated changed = random(ClaimUpdated.class, "changeset");
changed.setChangeset(List.of(modifications));
return Change.updated(changed);
}
private Change getClaimStatus(ClaimStatus claimStatus) {
ClaimStatusChanged changed = random(ClaimStatusChanged.class, "status");
changed.setStatus(claimStatus);
return Change.status_changed(changed);
}
private Claim getClaim(String email) {
ModificationUnit modificationUnit = getModificationUnit(getExternalUser(), getCommentModification());
modificationUnit.getUserInfo().setEmail(email);
List<ModificationUnit> changeset = List.of(
getModificationUnit(getInternalUser(), getCommentModification()),
modificationUnit,
getModificationUnit(getInternalUser(), getCommentModification()),
getModificationUnit(getInternalUser(), getStatusModification(getClaimPending())),
getModificationUnit(getExternalUser(), getNotCommentModification())
);
return getClaim(getClaimPending(), changeset);
}
private Conversation getConversation() {
List<Message> messages = randomListOf(5, Message.class);
Conversation conversation = random(Conversation.class, "messages");
conversation.setMessages(messages);
return conversation;
}
private Claim getClaim(ClaimStatus status, List<ModificationUnit> changeset) {
Claim claim = random(Claim.class, "status", "changeset", "metadata");
claim.setStatus(status);
claim.setChangeset(changeset);
return claim;
}
private ClaimModification getCommentModification() {
CommentModificationUnit modificationUnit = new CommentModificationUnit();
modificationUnit.setId("asd");
modificationUnit.setModification(CommentModification.creation(new CommentCreated()));
return ClaimModification.comment_modification(modificationUnit);
}
private ClaimModification getStatusModification(ClaimStatus claimStatus) {
StatusModificationUnit modificationUnit = new StatusModificationUnit();
modificationUnit.setStatus(claimStatus);
modificationUnit.setModification(StatusModification.change(new StatusChanged()));
return ClaimModification.status_modification(modificationUnit);
}
private ClaimModification getNotCommentModification() {
FileModificationUnit modificationUnit = new FileModificationUnit();
modificationUnit.setId("asd");
modificationUnit.setModification(FileModification.creation(new FileCreated()));
return ClaimModification.file_modification(modificationUnit);
}
private ModificationUnit getModificationUnit(UserType userType, ClaimModification claimModification) {
ModificationUnit modification = random(ModificationUnit.class, "modification", "user_info");
modification.setUserInfo(getUserInfo(userType));
modification.setModification(getClaimModification(claimModification));
return modification;
}
private UserInfo getUserInfo(UserType userType) {
UserInfo userInfo = random(UserInfo.class, "type");
userInfo.setType(userType);
return userInfo;
}
private Modification getClaimModification(ClaimModification modification) {
return Modification.claim_modification(modification);
}
private ClaimStatus getClaimPending() {
return ClaimStatus.pending(new ClaimPending());
}
private ClaimStatus getClaimAccepted() {
return ClaimStatus.accepted(new ClaimAccepted());
}
private ClaimStatus getClaimDenied() {
return ClaimStatus.denied(new ClaimDenied());
}
private UserType getExternalUser() {
return UserType.external_user(new ExternalUser());
}
private UserType getInternalUser() {
return UserType.internal_user(new InternalUser());
}
}

View File

@ -0,0 +1,141 @@
package com.rbkmoney.cm.dudoser.service;
import com.icegreen.greenmail.util.GreenMail;
import com.icegreen.greenmail.util.ServerSetup;
import com.rbkmoney.cm.dudoser.CMDudoserApplication;
import com.rbkmoney.cm.dudoser.domain.Message;
import com.rbkmoney.cm.dudoser.exception.MailSendException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import javax.mail.BodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
@TestPropertySource(properties = {
"mail.host=localhost",
"mail.port=2525",
"mail.username=username",
"mail.password=secret",
})
public class MailSenderTest {
// отправляем сообщение на фейковый почтовой сервер
@Rule
public SmtpServerRule smtpServerRule = new SmtpServerRule(2525);
@Autowired
private MailSenderService mailSenderService;
@Test
public void sendingMessageFlowTest() throws Exception {
Message message = createMessage();
mailSenderService.send(message);
MimeMessage[] receivedMessages = smtpServerRule.getMessages();
assertEquals(1, receivedMessages.length);
MimeMessage receivedMessage = receivedMessages[0];
assertEquals(message.getSubject(), receivedMessage.getSubject());
assertEquals(message.getTo(), receivedMessage.getAllRecipients()[0].toString());
assertTrue(getTextFromMimeMultipart((MimeMultipart) receivedMessage.getContent()).contains(message.getContent()));
}
@Test(expected = MailSendException.class)
public void incorrectAddressTest() {
Message message = createMessage();
message.setTo("asd, asd");
mailSenderService.send(message);
}
@Test(expected = MailSendException.class)
public void connectionRefusedTest() {
JavaMailSenderImpl sender = getJavaMailSender(2524);
Message message = createMessage();
MailSenderService senderService = new MailSenderService(sender);
senderService.send(message);
}
private String getTextFromMimeMultipart(MimeMultipart mimeMultipart) throws Exception {
String result = "";
int partCount = mimeMultipart.getCount();
for (int i = 0; i < partCount; i++) {
BodyPart bodyPart = mimeMultipart.getBodyPart(i);
if (bodyPart.isMimeType("text/plain")) {
result = result + "\n" + bodyPart.getContent();
break; // without break same text appears twice in my tests
} else if (bodyPart.isMimeType("text/html")) {
String html = (String) bodyPart.getContent();
// result = result + "\n" + org.jsoup.Jsoup.parse(html).text();
result = html;
} else if (bodyPart.getContent() instanceof MimeMultipart) {
result = result + getTextFromMimeMultipart((MimeMultipart) bodyPart.getContent());
}
}
return result;
}
private Message createMessage() {
return Message.builder()
.from("no-reply@rbk.com")
.to("info@rbk.com")
.subject("Spring Mail Integration Testing with JUnit and GreenMail Example")
.content("We show how to write Integration Tests using Spring and GreenMail.")
.build();
}
private JavaMailSenderImpl getJavaMailSender(int port) {
JavaMailSenderImpl sender = new JavaMailSenderImpl();
sender.setHost("localhost");
sender.setPort(port);
sender.setUsername("username");
sender.setPassword("secret");
return sender;
}
public static class SmtpServerRule extends ExternalResource {
private GreenMail smtpServer;
private int port;
public SmtpServerRule(int port) {
this.port = port;
}
@Override
protected void before() throws Throwable {
super.before();
smtpServer = new GreenMail(new ServerSetup(port, null, "smtp"));
smtpServer.start();
}
public MimeMessage[] getMessages() {
return smtpServer.getReceivedMessages();
}
@Override
protected void after() {
super.after();
smtpServer.stop();
}
}
}

View File

@ -0,0 +1,52 @@
package com.rbkmoney.cm.dudoser.service;
import com.rbkmoney.cm.dudoser.CMDudoserApplication;
import com.rbkmoney.cm.dudoser.domain.Message;
import com.rbkmoney.cm.dudoser.exception.MailSendException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.*;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
public class RetryableSenderTest {
@MockBean
private MailSenderService mailSenderService;
@Autowired
private RetryableSenderService retryableSenderService;
@Value("${mail.retry.max.attempts:3}")
private int maxAttempts;
@Test
public void test() {
AtomicInteger atomicInt = new AtomicInteger(0);
// на countRetries попытке мок корректно обработает вызов, в предыдущих попытках будет вынуждать сервис ретраить обработку ивента
when(mailSenderService.send(any())).thenAnswer(
invocation -> {
int increment = atomicInt.getAndIncrement();
if (increment < maxAttempts - 1) {
throw new MailSendException();
}
return true;
}
);
try {
retryableSenderService.sendToMail(Message.builder().build());
} finally {
verify(mailSenderService, times(maxAttempts)).send(any());
}
}
}

View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
<root level="info">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>