diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6de9d8f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,57 @@
+# Created by .ignore support plugin (hsz.mobi)
+.eunit
+deps
+*.o
+*.beam
+*.plt
+erl_crash.dump
+ebin/*.beam
+rel/example_project
+.concrete/DEV_MODE
+.rebar
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+.idea/
+
+*.iws
+*.ipr
+*.iml
+
+
+# IntelliJ
+/out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+*.class
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+env.list
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..ca5a761
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,4 @@
+[submodule "build_utils"]
+ path = build_utils
+ url = git@github.com:rbkmoney/build_utils.git
+ branch = master
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..13c60bd
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,18 @@
+#!groovy
+build('cm-dudoser', 'java-maven') {
+ checkoutRepo()
+ loadBuildUtils()
+
+ def javaServicePipeline
+ runStage('load JavaService pipeline') {
+ javaServicePipeline = load("build_utils/jenkins_lib/pipeJavaService.groovy")
+ }
+
+ def serviceName = env.REPO_NAME
+ def mvnArgs = '-DjvmArgs="-Xmx256m"'
+ def useJava11 = true
+ def registry = 'dr2.rbkmoney.com'
+ def registryCredsId = 'jenkins_harbor'
+
+ javaServicePipeline(serviceName, useJava11, mvnArgs, registry, registryCredsId)
+}
diff --git a/README.md b/README.md
index 078421d..a3dd0e3 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,2 @@
-# cm-hooker
-сервис отправки email-сообщений мерчантам при изменении статуса claim
+# cm-dudoser
+Сервис по отправки уведомлений на почту мерчанта по получаемым событиям в клейме
diff --git a/build_utils b/build_utils
new file mode 160000
index 0000000..8ad24ac
--- /dev/null
+++ b/build_utils
@@ -0,0 +1 @@
+Subproject commit 8ad24ac7dc831a280cf8622086d73867d0a04b93
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..e39235a
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,254 @@
+
+
+ 4.0.0
+
+
+ com.rbkmoney
+ spring-boot-starter-parent
+ 2.1.1.RELEASE
+
+
+ cm-dudoser
+ 0.0.1-SNAPSHOT
+ jar
+
+ cm-dudoser
+ claim-management-dudoser
+
+
+ UTF-8
+ UTF-8
+ 11
+ 8022
+ ${server.port}
+ bc95d0d6dc13c693acd2b274531a7d604b877bf3
+ ${env.REGISTRY}
+ 0.3.6
+ ${project.basedir}/target/jacoco.exec
+ 2.1.0
+ 0.1.3
+ 0.6.8
+
+
+
+
+
+ com.rbkmoney
+ spring-boot-starter-metrics-statsd
+ 1.1.0
+
+
+ com.rbkmoney.woody
+ woody-thrift
+ 1.1.15
+
+
+ com.rbkmoney
+ damsel
+ 1.387-6a4cdd6
+
+
+ com.rbkmoney
+ shared-resources
+ ${shared.resources.version}
+
+
+ com.rbkmoney
+ custom-actuator-endpoints
+ 0.0.1
+
+
+ com.rbkmoney.geck
+ common
+ ${geck.version}
+
+
+ com.rbkmoney
+ messages-proto
+ 1.10-391d225
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.hibernate
+ hibernate-validator
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.retry
+ spring-retry
+
+
+ org.springframework
+ spring-aspects
+
+
+ org.springframework.boot
+ spring-boot-starter-mail
+
+
+
+
+ javax.servlet
+ javax.servlet-api
+ 4.0.1
+
+
+ org.projectlombok
+ lombok
+ 1.18.10
+ provided
+
+
+ org.apache.velocity
+ velocity
+ 1.7
+
+
+ org.apache.velocity
+ velocity-tools
+ 2.0
+
+
+ com.rbkmoney
+ kafka-common-lib
+ ${kafka.common.lib.version}
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.clients.version}
+
+
+ org.springframework.kafka
+ spring-kafka
+ 2.2.8.RELEASE
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ 2.1.8.RELEASE
+ test
+
+
+ org.testcontainers
+ postgresql
+ 1.12.2
+ test
+
+
+ org.testcontainers
+ kafka
+ 1.10.2
+ test
+
+
+ com.icegreen
+ greenmail
+ 1.5.5
+ test
+
+
+ com.rbkmoney
+ easyway
+ 0.1.0
+ test
+
+
+
+
+
+
+ ${project.build.directory}/maven-shared-archive-resources
+ ${project.build.directory}
+
+ Dockerfile
+
+ true
+
+
+ ${project.build.directory}/maven-shared-archive-resources
+ true
+
+ Dockerfile
+
+
+
+ src/main/resources
+ true
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.1.RELEASE
+
+
+ org.apache.maven.plugins
+ maven-remote-resources-plugin
+ 1.6.0
+
+
+ org.apache.maven.shared
+ maven-filtering
+ 1.3
+
+
+
+
+ com.rbkmoney:shared-resources:${shared.resources.version}
+
+ false
+ false
+
+
+
+
+ process
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.5
+
+ ${sonar.jacoco.reportPaths}
+ true
+
+
+
+ agent
+
+ prepare-agent
+
+
+
+
+
+
+
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/CMDudoserApplication.java b/src/main/java/com/rbkmoney/cm/dudoser/CMDudoserApplication.java
new file mode 100644
index 0000000..944c620
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/CMDudoserApplication.java
@@ -0,0 +1,14 @@
+package com.rbkmoney.cm.dudoser;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.web.servlet.ServletComponentScan;
+
+@ServletComponentScan
+@SpringBootApplication
+public class CMDudoserApplication extends SpringApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(CMDudoserApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/ClientConfig.java b/src/main/java/com/rbkmoney/cm/dudoser/config/ClientConfig.java
new file mode 100644
index 0000000..bc5d26c
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/ClientConfig.java
@@ -0,0 +1,35 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import com.rbkmoney.damsel.claim_management.ClaimManagementSrv;
+import com.rbkmoney.damsel.messages.MessageServiceSrv;
+import com.rbkmoney.woody.thrift.impl.http.THSpawnClientBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.Resource;
+
+import java.io.IOException;
+
+@Slf4j
+@Configuration
+public class ClientConfig {
+
+ @Bean
+ public ClaimManagementSrv.Iface claimManagementClient(@Value("${claimmanagement.client.adapter.url}") Resource resource,
+ @Value("${claimmanagement.client.adapter.networkTimeout}") int timeout) throws IOException {
+ return new THSpawnClientBuilder()
+ .withAddress(resource.getURI())
+ .withNetworkTimeout(timeout)
+ .build(ClaimManagementSrv.Iface.class);
+ }
+
+ @Bean
+ public MessageServiceSrv.Iface messageServiceClient(@Value("${conversations.client.adapter.url}") Resource resource,
+ @Value("${conversations.client.adapter.networkTimeout}") int timeout) throws IOException {
+ return new THSpawnClientBuilder()
+ .withAddress(resource.getURI())
+ .withNetworkTimeout(timeout)
+ .build(MessageServiceSrv.Iface.class);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/KafkaConfig.java b/src/main/java/com/rbkmoney/cm/dudoser/config/KafkaConfig.java
new file mode 100644
index 0000000..ad0f527
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/KafkaConfig.java
@@ -0,0 +1,99 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import com.rbkmoney.cm.dudoser.config.properties.KafkaConsumerProperties;
+import com.rbkmoney.cm.dudoser.config.properties.KafkaSslProperties;
+import com.rbkmoney.cm.dudoser.deserializer.ClaimEventSinkDeserializer;
+import com.rbkmoney.damsel.claim_management.Event;
+import com.rbkmoney.kafka.common.exception.handler.SeekToCurrentWithSleepErrorHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.ErrorHandler;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+@Configuration
+@EnableConfigurationProperties({KafkaSslProperties.class, KafkaConsumerProperties.class})
+@RequiredArgsConstructor
+public class KafkaConfig {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Value("${kafka.error-handler.sleep-time-seconds}")
+ private int errorHandlerSleepTimeSeconds;
+
+ @Value("${kafka.error-handler.maxAttempts}")
+ private int errorHandlerMaxAttempts;
+
+ @Bean
+ public Map consumerConfigs(KafkaSslProperties kafkaSslProperties,
+ KafkaConsumerProperties kafkaConsumerProperties) {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClaimEventSinkDeserializer.class);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaConsumerProperties.getClientId());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProperties.isEnableAutoCommit());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProperties.getAutoOffsetReset());
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerProperties.getMaxPollRecords());
+ props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, kafkaConsumerProperties.getConnectionsMaxIdleMs());
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerProperties.getSessionTimeoutMs());
+ configureSsl(props, kafkaSslProperties);
+
+ return props;
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory(KafkaSslProperties kafkaSslProperties,
+ KafkaConsumerProperties kafkaConsumerProperties) {
+ return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties, kafkaConsumerProperties));
+ }
+
+ @Bean
+ public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory,
+ KafkaConsumerProperties kafkaConsumerProperties) {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory);
+ factory.getContainerProperties().setAckOnError(false);
+ factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
+ factory.setErrorHandler(kafkaErrorHandler());
+ factory.setConcurrency(kafkaConsumerProperties.getConcurrency());
+ return factory;
+ }
+
+ private void configureSsl(Map props, KafkaSslProperties kafkaSslProperties) {
+ if (kafkaSslProperties.isEnabled()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
+ props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
+ }
+ }
+
+ private ErrorHandler kafkaErrorHandler() {
+ return new SeekToCurrentWithSleepErrorHandler(errorHandlerSleepTimeSeconds, errorHandlerMaxAttempts);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/KafkaConsumerBeanEnableConfig.java b/src/main/java/com/rbkmoney/cm/dudoser/config/KafkaConsumerBeanEnableConfig.java
new file mode 100644
index 0000000..b228d63
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/KafkaConsumerBeanEnableConfig.java
@@ -0,0 +1,19 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import com.rbkmoney.cm.dudoser.handler.ClaimHandler;
+import com.rbkmoney.cm.dudoser.listener.ClaimEventSinkListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+
+@EnableKafka
+@Configuration
+public class KafkaConsumerBeanEnableConfig {
+
+ @Bean
+ @ConditionalOnProperty(value = "kafka.topics.claim-event-sink.enabled", havingValue = "true")
+ public ClaimEventSinkListener paymentEventsKafkaListener(ClaimHandler claimHandler) {
+ return new ClaimEventSinkListener(claimHandler);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/MailConfig.java b/src/main/java/com/rbkmoney/cm/dudoser/config/MailConfig.java
new file mode 100644
index 0000000..2f10308
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/MailConfig.java
@@ -0,0 +1,48 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.mail.javamail.JavaMailSender;
+import org.springframework.mail.javamail.JavaMailSenderImpl;
+
+import java.util.Properties;
+
+@Configuration
+public class MailConfig {
+
+ @Bean
+ public Properties mailProperties(@Value("${mail.protocol}") String protocol,
+ @Value("${mail.smtp.auth}") boolean smtpsAuth,
+ @Value("${mail.smtp.starttls.enable}") boolean starttls,
+ @Value("${mail.smtp.timeout}") int timeout,
+ @Value("${mail.host}") String host,
+ @Value("${mail.port}") int port,
+ @Value("${mail.username}") String username) {
+ Properties properties = new Properties();
+ properties.put("mail.smtp.auth", smtpsAuth);
+ properties.put("mail.smtp.starttls.enable", starttls);
+ properties.put("mail.smtp.connectiontimeout", timeout);
+ properties.put("mail.smtp.timeout", timeout);
+ properties.put("mail.transport.protocol", protocol);
+ properties.put("mail.smtp.host", host);
+ properties.put("mail.smtp.port", port);
+ properties.put("mail.username", username);
+ return properties;
+ }
+
+ @Bean
+ public JavaMailSender javaMailSender(Properties mailProperties,
+ @Value("${mail.host}") String host,
+ @Value("${mail.port}") int port,
+ @Value("${mail.username}") String username,
+ @Value("${mail.password}") String password) {
+ JavaMailSenderImpl sender = new JavaMailSenderImpl();
+ sender.setHost(host);
+ sender.setPort(port);
+ sender.setUsername(username);
+ sender.setPassword(password);
+ sender.setJavaMailProperties(mailProperties);
+ return sender;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/RetryConfig.java b/src/main/java/com/rbkmoney/cm/dudoser/config/RetryConfig.java
new file mode 100644
index 0000000..58b5709
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/RetryConfig.java
@@ -0,0 +1,36 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import com.rbkmoney.cm.dudoser.exception.MailSendException;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.retry.annotation.EnableRetry;
+import org.springframework.retry.backoff.FixedBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
+
+import java.util.Collections;
+
+@EnableRetry
+@Configuration
+public class RetryConfig {
+
+ @Value("${mail.retry.backoff.period:1000}")
+ private long backOffPeriod;
+
+ @Value("${mail.retry.max.attempts:3}")
+ private int maxAttempts;
+
+ @Bean
+ public RetryTemplate mailRetryTemplate() {
+ FixedBackOffPolicy policy = new FixedBackOffPolicy();
+ policy.setBackOffPeriod(backOffPeriod);
+
+ RetryTemplate retryTemplate = new RetryTemplate();
+ retryTemplate.setRetryPolicy(
+ new SimpleRetryPolicy(maxAttempts, Collections.singletonMap(MailSendException.class, true))
+ );
+ retryTemplate.setBackOffPolicy(policy);
+ return retryTemplate;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/TemplateConfig.java b/src/main/java/com/rbkmoney/cm/dudoser/config/TemplateConfig.java
new file mode 100644
index 0000000..0005b83
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/TemplateConfig.java
@@ -0,0 +1,26 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import org.apache.velocity.app.VelocityEngine;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class TemplateConfig {
+
+ @Bean
+ public VelocityEngine claimManagementTemplateEngine() {
+ String resourceLoader = "org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader";
+
+ VelocityEngine engine = getVelocityEngine(resourceLoader);
+
+ engine.init();
+ return engine;
+ }
+
+ private VelocityEngine getVelocityEngine(String resourceLoader) {
+ VelocityEngine engine = new VelocityEngine();
+ engine.setProperty("resource.loader", "class");
+ engine.setProperty("class.resource.loader.class", resourceLoader);
+ return engine;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/properties/KafkaConsumerProperties.java b/src/main/java/com/rbkmoney/cm/dudoser/config/properties/KafkaConsumerProperties.java
new file mode 100644
index 0000000..1003a43
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/properties/KafkaConsumerProperties.java
@@ -0,0 +1,25 @@
+package com.rbkmoney.cm.dudoser.config.properties;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Getter
+@Setter
+@ToString
+@Component
+@ConfigurationProperties(prefix = "kafka.consumer")
+public class KafkaConsumerProperties {
+
+ private String autoOffsetReset;
+ private boolean enableAutoCommit;
+ private String clientId;
+ private String groupId;
+ private Integer maxPollRecords;
+ private Integer connectionsMaxIdleMs;
+ private Integer sessionTimeoutMs;
+ private Integer concurrency;
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/config/properties/KafkaSslProperties.java b/src/main/java/com/rbkmoney/cm/dudoser/config/properties/KafkaSslProperties.java
new file mode 100644
index 0000000..d3b464f
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/config/properties/KafkaSslProperties.java
@@ -0,0 +1,23 @@
+package com.rbkmoney.cm.dudoser.config.properties;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Getter
+@Setter
+@Component
+@ConfigurationProperties(prefix = "kafka.ssl")
+public class KafkaSslProperties {
+
+ private String trustStorePassword;
+ private String trustStoreLocation;
+ private String keyStorePassword;
+ private String keyPassword;
+ private String keyStoreLocation;
+ private boolean enabled;
+ private String keyStoreType;
+ private String trustStoreType;
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/deserializer/ClaimEventSinkDeserializer.java b/src/main/java/com/rbkmoney/cm/dudoser/deserializer/ClaimEventSinkDeserializer.java
new file mode 100644
index 0000000..2e12e18
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/deserializer/ClaimEventSinkDeserializer.java
@@ -0,0 +1,13 @@
+package com.rbkmoney.cm.dudoser.deserializer;
+
+import com.rbkmoney.damsel.claim_management.Event;
+import com.rbkmoney.kafka.common.serialization.AbstractThriftDeserializer;
+
+public class ClaimEventSinkDeserializer extends AbstractThriftDeserializer {
+
+ @Override
+ public Event deserialize(String s, byte[] bytes) {
+ return super.deserialize(bytes, new Event());
+ }
+
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/domain/ClaimData.java b/src/main/java/com/rbkmoney/cm/dudoser/domain/ClaimData.java
new file mode 100644
index 0000000..daeb0a6
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/domain/ClaimData.java
@@ -0,0 +1,15 @@
+package com.rbkmoney.cm.dudoser.domain;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class ClaimData {
+
+ private TemplateType templateType;
+ private String id;
+ private String status;
+ private String comment;
+
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/domain/ClaimStatus.java b/src/main/java/com/rbkmoney/cm/dudoser/domain/ClaimStatus.java
new file mode 100644
index 0000000..9731558
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/domain/ClaimStatus.java
@@ -0,0 +1,19 @@
+package com.rbkmoney.cm.dudoser.domain;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+@Getter
+public enum ClaimStatus {
+
+ PENDING("В ожидании"),
+ REVIEW("На рассмотрении"),
+ PENDING_ACCEPTANCE("В ожидании подтверждения"),
+ ACCEPTED("Подтверждена"),
+ DENIED("Отклонена"),
+ REVOKED("Отозвана");
+
+ private final String cyrillicValue;
+
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/domain/Message.java b/src/main/java/com/rbkmoney/cm/dudoser/domain/Message.java
new file mode 100644
index 0000000..2240be3
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/domain/Message.java
@@ -0,0 +1,18 @@
+package com.rbkmoney.cm.dudoser.domain;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class Message {
+
+ private String from;
+ private String to;
+ private String subject;
+ private String content;
+
+ private String partyId;
+ private long claimId;
+
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/domain/TemplateType.java b/src/main/java/com/rbkmoney/cm/dudoser/domain/TemplateType.java
new file mode 100644
index 0000000..1905ad2
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/domain/TemplateType.java
@@ -0,0 +1,8 @@
+package com.rbkmoney.cm.dudoser.domain;
+
+public enum TemplateType {
+
+ STATUSCHANGE,
+ COMMENT
+
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/exception/MailSendException.java b/src/main/java/com/rbkmoney/cm/dudoser/exception/MailSendException.java
new file mode 100644
index 0000000..e95a59c
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/exception/MailSendException.java
@@ -0,0 +1,23 @@
+package com.rbkmoney.cm.dudoser.exception;
+
+public class MailSendException extends RuntimeException {
+
+ public MailSendException() {
+ }
+
+ public MailSendException(String message) {
+ super(message);
+ }
+
+ public MailSendException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MailSendException(Throwable cause) {
+ super(cause);
+ }
+
+ public MailSendException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/exception/NotFoundException.java b/src/main/java/com/rbkmoney/cm/dudoser/exception/NotFoundException.java
new file mode 100644
index 0000000..5b9f836
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/exception/NotFoundException.java
@@ -0,0 +1,23 @@
+package com.rbkmoney.cm.dudoser.exception;
+
+public class NotFoundException extends RuntimeException {
+
+ public NotFoundException() {
+ }
+
+ public NotFoundException(String message) {
+ super(message);
+ }
+
+ public NotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NotFoundException(Throwable cause) {
+ super(cause);
+ }
+
+ public NotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/exception/ThriftClientException.java b/src/main/java/com/rbkmoney/cm/dudoser/exception/ThriftClientException.java
new file mode 100644
index 0000000..7542a8a
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/exception/ThriftClientException.java
@@ -0,0 +1,23 @@
+package com.rbkmoney.cm.dudoser.exception;
+
+public class ThriftClientException extends RuntimeException {
+
+ public ThriftClientException() {
+ }
+
+ public ThriftClientException(String message) {
+ super(message);
+ }
+
+ public ThriftClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ThriftClientException(Throwable cause) {
+ super(cause);
+ }
+
+ public ThriftClientException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/handler/ClaimHandler.java b/src/main/java/com/rbkmoney/cm/dudoser/handler/ClaimHandler.java
new file mode 100644
index 0000000..98a0802
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/handler/ClaimHandler.java
@@ -0,0 +1,83 @@
+package com.rbkmoney.cm.dudoser.handler;
+
+import com.rbkmoney.cm.dudoser.domain.Message;
+import com.rbkmoney.cm.dudoser.service.MessageBuilderService;
+import com.rbkmoney.cm.dudoser.service.RetryableSenderService;
+import com.rbkmoney.damsel.claim_management.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class ClaimHandler {
+
+ private final MessageBuilderService statusMessageBuilder;
+ private final MessageBuilderService commentChangeMessageBuilder;
+ private final RetryableSenderService retryableSenderService;
+
+ public void handle(Event event) {
+ try {
+ handleEvent(event);
+ } catch (Exception ex) {
+ log.error("Some problem when handling", ex);
+ throw ex;
+ }
+ }
+
+ private void handleEvent(Event event) {
+ Change change = event.getChange();
+
+ if (change.isSetStatusChanged()) {
+ ClaimStatusChanged claimStatusChanged = change.getStatusChanged();
+
+ String partyId = claimStatusChanged.getPartyId();
+ long claimId = claimStatusChanged.getId();
+
+ log.info("Handle status change event with id {} for party {} get started", claimId, partyId);
+
+ Message message = statusMessageBuilder.build(claimStatusChanged, partyId, claimId);
+
+ retryableSenderService.sendToMail(message);
+
+ log.info("Handle status change event with party id '{}' and claim id '{}' finished", partyId, claimId);
+ } else if (containsCommentModifications(change)) {
+ ClaimUpdated claimUpdated = change.getUpdated();
+
+ String partyId = claimUpdated.getPartyId();
+ long claimId = claimUpdated.getId();
+
+ List commentModifications = change.getUpdated().getChangeset().stream()
+ .filter(Modification::isSetClaimModification)
+ .map(Modification::getClaimModification)
+ .filter(ClaimModification::isSetCommentModification)
+ .map(ClaimModification::getCommentModification)
+ .collect(Collectors.toList());
+
+ for (CommentModificationUnit commentModification : commentModifications) {
+ log.info("Handle comment modification update change event get started, claimId={}, partyId={}, commentId={}", claimId, partyId, commentModification.getId());
+
+ Message message = commentChangeMessageBuilder.build(commentModification, partyId, claimId);
+
+ retryableSenderService.sendToMail(message);
+
+ log.info("Handle comment modification update change event finished, claimId={}, partyId={}, commentId={}", claimId, partyId, commentModification.getId());
+ }
+ }
+ }
+
+ private boolean containsCommentModifications(Change change) {
+ if (change.isSetUpdated()) {
+ return change.getUpdated().getChangeset().stream()
+ .filter(Modification::isSetClaimModification)
+ .map(Modification::getClaimModification)
+ .anyMatch(ClaimModification::isSetCommentModification);
+ }
+
+ return false;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/listener/ClaimEventSinkListener.java b/src/main/java/com/rbkmoney/cm/dudoser/listener/ClaimEventSinkListener.java
new file mode 100644
index 0000000..494d735
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/listener/ClaimEventSinkListener.java
@@ -0,0 +1,38 @@
+package com.rbkmoney.cm.dudoser.listener;
+
+import com.rbkmoney.cm.dudoser.handler.ClaimHandler;
+import com.rbkmoney.damsel.claim_management.Event;
+import com.rbkmoney.damsel.claim_management.InternalUser;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.thrift.TException;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+
+@Slf4j
+@RequiredArgsConstructor
+public class ClaimEventSinkListener {
+
+ private final ClaimHandler claimHandler;
+
+ @KafkaListener(topics = "${kafka.topics.claim-event-sink.id}", containerFactory = "kafkaListenerContainerFactory")
+ public void handle(Event event, Acknowledgment ack) throws TException {
+ log.info("Handle claim management Event get started, occuredAt={}", event.getOccuredAt());
+
+ if (event.getUserInfo() != null && isInternalUser(event)) {
+ claimHandler.handle(event);
+ }
+
+ log.info("Handle claim management Event finished, occuredAt={}", event.getOccuredAt());
+
+ ack.acknowledge();
+ }
+
+ private boolean isInternalUser(Event event) {
+ return event.getUserInfo().getType().equals(internalUser());
+ }
+
+ private com.rbkmoney.damsel.claim_management.UserType internalUser() {
+ return com.rbkmoney.damsel.claim_management.UserType.internal_user(new InternalUser());
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/ClaimService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/ClaimService.java
new file mode 100644
index 0000000..7ad2074
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/ClaimService.java
@@ -0,0 +1,64 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.exception.NotFoundException;
+import com.rbkmoney.cm.dudoser.exception.ThriftClientException;
+import com.rbkmoney.damsel.claim_management.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.thrift.TException;
+import org.springframework.stereotype.Service;
+
+import java.util.Optional;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class ClaimService {
+
+ private final ClaimManagementSrv.Iface claimManagementClient;
+
+ public String getEmailByClaim(String partyId, long claimId) {
+ Claim claim = getClaim(partyId, claimId);
+
+ String emailTo = claim.getChangeset().stream()
+ .filter(this::isExternalUser)
+ .findFirst()
+ .map(ModificationUnit::getUserInfo)
+ .map(UserInfo::getEmail)
+ .orElse(null);
+
+ if (emailTo == null) {
+ throw new NotFoundException(String.format("ExternalUser info from Claim can not be null, partyId=%s, claimId=%s", partyId, claimId));
+ }
+
+ return emailTo;
+ }
+
+ private Claim getClaim(String partyId, long claimId) {
+ try {
+ log.info("Trying to get Claim from thrift client, partyId={}, claimId={}", partyId, claimId);
+
+ Claim claim = claimManagementClient.getClaim(partyId, claimId);
+
+ if (claim == null || claim.getChangeset() == null || claim.getChangeset().isEmpty()) {
+ throw new NotFoundException(String.format("Changeset from Claim can not be null, partyId=%s, claimId=%s", partyId, claimId));
+ }
+
+ return claim;
+ } catch (TException ex) {
+ throw new ThriftClientException(String.format("Failed to get Claim from thrift client, partyId=%s, claimId=%s", partyId, claimId), ex);
+ }
+ }
+
+ private boolean isExternalUser(ModificationUnit modificationUnit) {
+ return Optional.ofNullable(modificationUnit)
+ .map(ModificationUnit::getUserInfo)
+ .map(UserInfo::getType)
+ .map(userType -> userType.equals(externalUser()))
+ .orElse(false);
+ }
+
+ private com.rbkmoney.damsel.claim_management.UserType externalUser() {
+ return com.rbkmoney.damsel.claim_management.UserType.external_user(new ExternalUser());
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/ConversationService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/ConversationService.java
new file mode 100644
index 0000000..6f75983
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/ConversationService.java
@@ -0,0 +1,38 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.exception.NotFoundException;
+import com.rbkmoney.cm.dudoser.exception.ThriftClientException;
+import com.rbkmoney.damsel.messages.Conversation;
+import com.rbkmoney.damsel.messages.ConversationFilter;
+import com.rbkmoney.damsel.messages.GetConversationResponse;
+import com.rbkmoney.damsel.messages.MessageServiceSrv;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.thrift.TException;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class ConversationService {
+
+ private final MessageServiceSrv.Iface messageServiceClient;
+
+ public Conversation getConversation(String conversationId) {
+ try {
+ log.info("Trying to get Conversation from thrift client, conversationId={}", conversationId);
+
+ GetConversationResponse response = messageServiceClient.getConversations(List.of(conversationId), new ConversationFilter());
+
+ if (response == null || response.getConversations() == null || response.getConversations().size() != 1) {
+ throw new NotFoundException(String.format("Conversation's size must be = 1, conversationId=%s", conversationId));
+ }
+
+ return response.getConversations().get(0);
+ } catch (TException ex) {
+ throw new ThriftClientException(String.format("Failed to get Conversation from thrift client, conversationId=%s", conversationId), ex);
+ }
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/MailSenderService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/MailSenderService.java
new file mode 100644
index 0000000..4688674
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/MailSenderService.java
@@ -0,0 +1,47 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.domain.Message;
+import com.rbkmoney.cm.dudoser.exception.MailSendException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.mail.MailException;
+import org.springframework.mail.javamail.JavaMailSender;
+import org.springframework.mail.javamail.MimeMessageHelper;
+import org.springframework.stereotype.Service;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeMessage;
+import java.nio.charset.StandardCharsets;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class MailSenderService {
+
+ private final JavaMailSender mailSender;
+
+ public boolean send(Message message) {
+ try {
+ log.info("Trying to send message to mail, partyId={}, claimId={}, email={}", message.getPartyId(), message.getClaimId(), message.getTo());
+
+ MimeMessage mimeMessage = getMimeMessage(message);
+
+ mailSender.send(mimeMessage);
+
+ return true;
+ } catch (MessagingException | MailException ex) {
+ throw new MailSendException(String.format("Received exception while sending message to mail, partyId=%s, claimId=%s, email=%s", message.getPartyId(), message.getClaimId(), message.getTo()), ex);
+ }
+ }
+
+ private MimeMessage getMimeMessage(Message message) throws MessagingException {
+ MimeMessage mimeMessage = mailSender.createMimeMessage();
+
+ MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true, StandardCharsets.UTF_8.name());
+ helper.setFrom(message.getFrom());
+ helper.setTo(message.getTo());
+ helper.setSubject(message.getSubject());
+ helper.setText(message.getContent(), false);
+ return mimeMessage;
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/MessageBuilderService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/MessageBuilderService.java
new file mode 100644
index 0000000..cbeb879
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/MessageBuilderService.java
@@ -0,0 +1,9 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.domain.Message;
+
+public interface MessageBuilderService {
+
+ Message build(T change, String partyId, long claimId);
+
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/RetryableSenderService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/RetryableSenderService.java
new file mode 100644
index 0000000..9ecc380
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/RetryableSenderService.java
@@ -0,0 +1,20 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class RetryableSenderService {
+
+ private final MailSenderService mailSenderService;
+ private final RetryTemplate mailRetryTemplate;
+
+ public void sendToMail(Message message) {
+ mailRetryTemplate.execute(context -> mailSenderService.send(message));
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/TemplateService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/TemplateService.java
new file mode 100644
index 0000000..2ed1317
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/TemplateService.java
@@ -0,0 +1,48 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.domain.ClaimData;
+import com.rbkmoney.cm.dudoser.exception.NotFoundException;
+import lombok.RequiredArgsConstructor;
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.VelocityEngine;
+import org.springframework.stereotype.Component;
+
+import java.io.StringWriter;
+
+@Component
+@RequiredArgsConstructor
+public class TemplateService {
+
+ private static final String TITLE = "claimManagementData";
+ private static final String STATUS_CHANGED_TEMPLATE = "vm/StatusChangedEntity.vm";
+ private static final String COMMENT_CHANGED_TEMPLATE = "vm/CommentChangedEntity.vm";
+
+ private final VelocityEngine claimManagementTemplateEngine;
+
+ public String process(ClaimData data) {
+ VelocityContext headerContext = new VelocityContext();
+ headerContext.put(TITLE, data);
+
+ Template template = buildTemplate(data);
+
+ return build(template, headerContext);
+ }
+
+ private Template buildTemplate(ClaimData data) {
+ switch (data.getTemplateType()) {
+ case STATUSCHANGE:
+ return claimManagementTemplateEngine.getTemplate(STATUS_CHANGED_TEMPLATE);
+ case COMMENT:
+ return claimManagementTemplateEngine.getTemplate(COMMENT_CHANGED_TEMPLATE);
+ default:
+ throw new NotFoundException("templateType not found");
+ }
+ }
+
+ private String build(Template template, VelocityContext context) {
+ StringWriter writer = new StringWriter();
+ template.merge(context, writer);
+ return writer.toString();
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/impl/AbstractMessageBuilderService.java b/src/main/java/com/rbkmoney/cm/dudoser/service/impl/AbstractMessageBuilderService.java
new file mode 100644
index 0000000..2b7a5cd
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/impl/AbstractMessageBuilderService.java
@@ -0,0 +1,34 @@
+package com.rbkmoney.cm.dudoser.service.impl;
+
+import com.rbkmoney.cm.dudoser.domain.Message;
+import com.rbkmoney.cm.dudoser.service.ClaimService;
+import com.rbkmoney.cm.dudoser.service.MessageBuilderService;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public abstract class AbstractMessageBuilderService implements MessageBuilderService {
+
+ private final ClaimService claimService;
+ private final String emailFrom;
+ private final String subject;
+
+ @Override
+ public Message build(T change, String partyId, long claimId) {
+ String emailTo = claimService.getEmailByClaim(partyId, claimId);
+
+ return build(emailFrom, emailTo, getContent(change, claimId), subject, partyId, claimId);
+ }
+
+ protected abstract String getContent(T change, long claimId);
+
+ private Message build(String emailFrom, String emailTo, String content, String subject, String partyId, long claimId) {
+ return Message.builder()
+ .from(emailFrom)
+ .to(emailTo)
+ .subject(subject)
+ .content(content)
+ .partyId(partyId)
+ .claimId(claimId)
+ .build();
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/impl/CommentChangeMessageBuilderServiceImpl.java b/src/main/java/com/rbkmoney/cm/dudoser/service/impl/CommentChangeMessageBuilderServiceImpl.java
new file mode 100644
index 0000000..1bfcb38
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/impl/CommentChangeMessageBuilderServiceImpl.java
@@ -0,0 +1,44 @@
+package com.rbkmoney.cm.dudoser.service.impl;
+
+import com.rbkmoney.cm.dudoser.domain.ClaimData;
+import com.rbkmoney.cm.dudoser.domain.TemplateType;
+import com.rbkmoney.cm.dudoser.service.ClaimService;
+import com.rbkmoney.cm.dudoser.service.ConversationService;
+import com.rbkmoney.cm.dudoser.service.TemplateService;
+import com.rbkmoney.damsel.claim_management.CommentModificationUnit;
+import com.rbkmoney.damsel.messages.Conversation;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+@Qualifier("commentChangeMessageBuilder")
+public class CommentChangeMessageBuilderServiceImpl extends AbstractMessageBuilderService {
+
+ private final ConversationService conversationService;
+ private final TemplateService templateService;
+
+ public CommentChangeMessageBuilderServiceImpl(ClaimService claimService,
+ @Value("${mail.from}") String emailFrom,
+ @Value("${mail.subject.comment}") String subject,
+ ConversationService conversationService,
+ TemplateService templateService) {
+ super(claimService, emailFrom, subject);
+ this.conversationService = conversationService;
+ this.templateService = templateService;
+ }
+
+ protected String getContent(CommentModificationUnit commentModification, long claimId) {
+ Conversation conversation = conversationService.getConversation(commentModification.getId());
+
+ com.rbkmoney.damsel.messages.Message message = conversation.getMessages().get(conversation.getMessages().size() - 1);
+
+ ClaimData claimData = ClaimData.builder()
+ .templateType(TemplateType.COMMENT)
+ .id(String.valueOf(claimId))
+ .comment(message.getText())
+ .build();
+
+ return templateService.process(claimData);
+ }
+}
diff --git a/src/main/java/com/rbkmoney/cm/dudoser/service/impl/StatusChangeMessageBuilderServiceImpl.java b/src/main/java/com/rbkmoney/cm/dudoser/service/impl/StatusChangeMessageBuilderServiceImpl.java
new file mode 100644
index 0000000..c4d15a4
--- /dev/null
+++ b/src/main/java/com/rbkmoney/cm/dudoser/service/impl/StatusChangeMessageBuilderServiceImpl.java
@@ -0,0 +1,42 @@
+package com.rbkmoney.cm.dudoser.service.impl;
+
+import com.rbkmoney.cm.dudoser.domain.ClaimData;
+import com.rbkmoney.cm.dudoser.domain.TemplateType;
+import com.rbkmoney.cm.dudoser.service.ClaimService;
+import com.rbkmoney.cm.dudoser.service.TemplateService;
+import com.rbkmoney.damsel.claim_management.ClaimStatus;
+import com.rbkmoney.damsel.claim_management.ClaimStatusChanged;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import static com.rbkmoney.cm.dudoser.domain.ClaimStatus.valueOf;
+
+@Service
+@Qualifier("statusMessageBuilder")
+public class StatusChangeMessageBuilderServiceImpl extends AbstractMessageBuilderService {
+
+ private final TemplateService templateService;
+
+ public StatusChangeMessageBuilderServiceImpl(ClaimService claimService,
+ @Value("${mail.from}") String emailFrom,
+ @Value("${mail.subject.comment}") String subject,
+ TemplateService templateService) {
+ super(claimService, emailFrom, subject);
+ this.templateService = templateService;
+ }
+
+ protected String getContent(ClaimStatusChanged claimStatusChanged, long claimId) {
+ ClaimData claimData = ClaimData.builder()
+ .templateType(TemplateType.STATUSCHANGE)
+ .id(String.valueOf(claimId))
+ .status(convertStatus(claimStatusChanged.getStatus()))
+ .build();
+
+ return templateService.process(claimData);
+ }
+
+ private String convertStatus(ClaimStatus tClaimStatus) {
+ return valueOf(tClaimStatus.getSetField().getFieldName().toUpperCase()).getCyrillicValue();
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..cad8f93
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,80 @@
+server:
+ port: @server.port@
+management:
+ security:
+ flag: false
+ metrics:
+ export:
+ statsd:
+ flavor: ETSY
+ enabled: false
+ endpoint:
+ health:
+ show-details: ALWAYS
+spring:
+ application:
+ name: @project.name@
+ output:
+ ansi:
+ enabled: ALWAYS
+info:
+ version: @project.version@
+ stage: dev
+kafka:
+ bootstrap:
+ servers: "localhost:9092"
+ topics:
+ claim-event-sink:
+ id: "claim-event-sink"
+ enabled: false
+ ssl:
+ enabled: false
+ trust-store-location: "test"
+ trust-store-password: "test"
+ key-store-location: "test"
+ key-store-password: "test"
+ key-password: "test"
+ key-store-type: PKCS12
+ trust-store-type: PKCS12
+ consumer:
+ concurrency: 5
+ client-id: claim-management
+ group-id: claim-management-group-1
+ enable-auto-commit: false
+ auto-offset-reset: latest
+ max-poll-records: 20
+ connections-max-idle-ms: 300000
+ session-timeout-ms: 300000
+ error-handler:
+ sleep-time-seconds: 5
+ maxAttempts: -1
+mail:
+ host: mr1.linode.rbkmoney.net
+ port: 25
+ username: ""
+ password: ""
+ from: no-reply@rbkmoney.com
+ protocol: smtp
+ retry:
+ max:
+ attempts: 3
+ backoff:
+ period: 1000
+ subject:
+ status: "Изменение статуса вашей заявки на подключение к RBK.money"
+ comment: "Добавлен новый комментарий по вашей заявки на подключение к RBK.money"
+ smtp:
+ auth: false
+ timeout: 30000
+ starttls:
+ enable: true
+claimmanagement:
+ client:
+ adapter:
+ url: http://localhost:8022/v1/claimmanagement
+ networkTimeout: 30000
+conversations:
+ client:
+ adapter:
+ url: http://localhost:8022/v1/conversations
+ networkTimeout: 30000
diff --git a/src/main/resources/vm/CommentChangedEntity.vm b/src/main/resources/vm/CommentChangedEntity.vm
new file mode 100644
index 0000000..1cbedc8
--- /dev/null
+++ b/src/main/resources/vm/CommentChangedEntity.vm
@@ -0,0 +1,3 @@
+## @vtlvariable name="claimManagementData" type="com.rbkmoney.cm.dudoser.domain.ClaimData"
+Добрый день. У нас для вас новости. По вашей заявке "$!claimManagementData.id" поступил ответ "$!claimManagementData.comment"
+Всегда на связи, ваш RBK.money!
diff --git a/src/main/resources/vm/StatusChangedEntity.vm b/src/main/resources/vm/StatusChangedEntity.vm
new file mode 100644
index 0000000..f4dd9a8
--- /dev/null
+++ b/src/main/resources/vm/StatusChangedEntity.vm
@@ -0,0 +1,3 @@
+## @vtlvariable name="claimManagementData" type="com.rbkmoney.cm.dudoser.domain.ClaimData"
+Добрый день. У нас для вас новости. Статус вашей заявки "$!claimManagementData.id" изменен на '$!claimManagementData.status".
+Хорошего дня! Всегда на связи, ваш RBK.money!
diff --git a/src/test/java/com/rbkmoney/cm/dudoser/config/AbstractKafkaConfig.java b/src/test/java/com/rbkmoney/cm/dudoser/config/AbstractKafkaConfig.java
new file mode 100644
index 0000000..96934e0
--- /dev/null
+++ b/src/test/java/com/rbkmoney/cm/dudoser/config/AbstractKafkaConfig.java
@@ -0,0 +1,76 @@
+package com.rbkmoney.cm.dudoser.config;
+
+import com.rbkmoney.cm.dudoser.CMDudoserApplication;
+import com.rbkmoney.easyway.*;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.ClassRule;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.ConfigFileApplicationContextInitializer;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.testcontainers.containers.FailureDetectingExternalResource;
+
+import java.util.function.Consumer;
+
+import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
+@ContextConfiguration(initializers = AbstractKafkaConfig.Initializer.class)
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@Slf4j
+public abstract class AbstractKafkaConfig extends AbstractTestUtils {
+
+ private static TestContainers testContainers = TestContainersBuilder.builderWithTestContainers(TestContainersParameters::new)
+ .addKafkaTestContainer()
+ .build();
+
+ @ClassRule
+ public static final FailureDetectingExternalResource resource = new FailureDetectingExternalResource() {
+
+ @Override
+ protected void starting(Description description) {
+ testContainers.startTestContainers();
+ }
+
+ @Override
+ protected void failed(Throwable e, Description description) {
+ log.warn("Test Container running was failed ", e);
+ }
+
+ @Override
+ protected void finished(Description description) {
+ testContainers.stopTestContainers();
+ }
+ };
+
+ public static class Initializer extends ConfigFileApplicationContextInitializer {
+
+ @Override
+ public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
+ super.initialize(configurableApplicationContext);
+ TestPropertyValues.of(
+ testContainers.getEnvironmentProperties(
+ getEnvironmentPropertiesConsumer()
+ )
+ )
+ .applyTo(configurableApplicationContext);
+ }
+ }
+
+ private static Consumer getEnvironmentPropertiesConsumer() {
+ return environmentProperties -> {
+ environmentProperties.put("kafka.topics.claim-event-sink.enabled", "true");
+ environmentProperties.put("kafka.error-handler.sleep-time-seconds", "1");
+ testContainers.getKafkaTestContainer().ifPresent(
+ c -> environmentProperties.put("kafka.bootstrap.servers", c.getBootstrapServers())
+ );
+
+ };
+ }
+}
diff --git a/src/test/java/com/rbkmoney/cm/dudoser/service/ConsumerTests.java b/src/test/java/com/rbkmoney/cm/dudoser/service/ConsumerTests.java
new file mode 100644
index 0000000..0e9840c
--- /dev/null
+++ b/src/test/java/com/rbkmoney/cm/dudoser/service/ConsumerTests.java
@@ -0,0 +1,147 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.config.AbstractKafkaConfig;
+import com.rbkmoney.cm.dudoser.domain.Message;
+import com.rbkmoney.cm.dudoser.exception.MailSendException;
+import com.rbkmoney.damsel.claim_management.*;
+import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.github.benas.randombeans.api.EnhancedRandom.random;
+import static org.mockito.Mockito.*;
+
+@Slf4j
+public class ConsumerTests extends AbstractKafkaConfig {
+
+ @Value("${kafka.topics.claim-event-sink.id}")
+ public String topic;
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @MockBean
+ private RetryableSenderService retryableSenderService;
+
+ @MockBean
+ private MessageBuilderService statusMessageBuilder;
+
+ @Test
+ public void correctFlowTest() {
+ ClaimStatusChanged claimStatusChanged = getClaimStatusChanged();
+
+ Event event = new Event();
+ event.setUserInfo(getUserInfo());
+ event.setOccuredAt(LocalDateTime.now().toString());
+ event.setChange(Change.status_changed(claimStatusChanged));
+
+ when(statusMessageBuilder.build(eq(claimStatusChanged), anyString(), anyLong())).thenReturn(Message.builder().build());
+ doNothing().when(retryableSenderService).sendToMail(any());
+
+ try {
+ DefaultKafkaProducerFactory producerFactory = createProducerFactory();
+
+ KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
+
+ TimeUnit.SECONDS.sleep(1);
+
+ kafkaTemplate.send(
+ topic,
+ random(String.class),
+ event
+ )
+ .get();
+
+ TimeUnit.SECONDS.sleep(1);
+ } catch (ExecutionException | InterruptedException ex) {
+ ex.printStackTrace();
+ }
+
+ verify(statusMessageBuilder, times(1)).build(eq(claimStatusChanged), anyString(), anyLong());
+ // проверяем что ивент из кафки был корректно отправлен сообщением на почтовый сервер
+ verify(retryableSenderService, times(1)).sendToMail(any());
+ }
+
+ @Test
+ public void errorRetryTest() {
+ int countRetries = 5;
+ AtomicInteger atomicInt = new AtomicInteger(0);
+ ClaimStatusChanged claimStatusChanged = getClaimStatusChanged();
+
+ Event event = new Event();
+ event.setUserInfo(getUserInfo());
+ event.setOccuredAt(LocalDateTime.now().toString());
+ event.setChange(Change.status_changed(claimStatusChanged));
+
+ when(statusMessageBuilder.build(eq(claimStatusChanged), anyString(), anyLong())).thenReturn(Message.builder().build());
+ // на countRetries попытке мок корректно обработает вызов, в предыдущих попытках будет вынуждать кафку ретраить обработку ивента
+ doAnswer(
+ invocation -> {
+ int increment = atomicInt.getAndIncrement();
+ if (increment < countRetries - 1) {
+ throw new MailSendException();
+ }
+ return true;
+ }
+ ).when(retryableSenderService).sendToMail(any());
+
+ try {
+ DefaultKafkaProducerFactory producerFactory = createProducerFactory();
+
+ KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
+
+ TimeUnit.SECONDS.sleep(1);
+
+ kafkaTemplate.send(
+ topic,
+ random(String.class),
+ event
+ )
+ .get();
+
+ TimeUnit.SECONDS.sleep(6);
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+
+ verify(statusMessageBuilder, times(countRetries)).build(eq(claimStatusChanged), anyString(), anyLong());
+ // проверяем, что кафка пыталась обработать ивент такое количество раз, которое задано моку
+ verify(retryableSenderService, times(countRetries)).sendToMail(any());
+ }
+
+ private UserInfo getUserInfo() {
+ UserInfo userInfo = new UserInfo();
+ userInfo.setType(UserType.internal_user(new InternalUser()));
+ userInfo.setId(UUID.randomUUID().toString());
+ userInfo.setUsername("asd");
+ userInfo.setEmail("asd@sad.com");
+ return userInfo;
+ }
+
+ private ClaimStatusChanged getClaimStatusChanged() {
+ return new ClaimStatusChanged(UUID.randomUUID().toString(), 1, ClaimStatus.pending(new ClaimPending()), 1, LocalDateTime.now().toString());
+ }
+
+ private DefaultKafkaProducerFactory createProducerFactory() {
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "client_id");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ThriftSerializer().getClass().getName());
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+}
diff --git a/src/test/java/com/rbkmoney/cm/dudoser/service/ListenerTest.java b/src/test/java/com/rbkmoney/cm/dudoser/service/ListenerTest.java
new file mode 100644
index 0000000..84c5359
--- /dev/null
+++ b/src/test/java/com/rbkmoney/cm/dudoser/service/ListenerTest.java
@@ -0,0 +1,200 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.CMDudoserApplication;
+import com.rbkmoney.cm.dudoser.handler.ClaimHandler;
+import com.rbkmoney.cm.dudoser.listener.ClaimEventSinkListener;
+import com.rbkmoney.damsel.claim_management.*;
+import com.rbkmoney.damsel.messages.Conversation;
+import com.rbkmoney.damsel.messages.Message;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+
+import static io.github.benas.randombeans.api.EnhancedRandom.random;
+import static io.github.benas.randombeans.api.EnhancedRandom.randomListOf;
+import static org.mockito.Mockito.*;
+import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
+public class ListenerTest {
+
+ @MockBean
+ private RetryableSenderService retryableSenderService;
+
+ @MockBean
+ private ClaimService claimService;
+
+ @MockBean
+ private ConversationService conversationService;
+
+ @Autowired
+ private ClaimHandler claimHandler;
+
+ private String email = "no-reply@rbk.com";
+ private ClaimEventSinkListener listener;
+
+ @Before
+ public void setUp() {
+ doNothing().when(retryableSenderService).sendToMail(any());
+ when(conversationService.getConversation(anyString())).thenReturn(getConversation());
+ when(claimService.getEmailByClaim(anyString(), anyLong())).thenReturn(email);
+ listener = new ClaimEventSinkListener(claimHandler);
+ }
+
+ @Test
+ public void testExternalUser() throws Exception {
+ //если тип пользователя External, то сервис пропускает любой тип получаемого change
+ sendMail(listener, getEvent(getExternalUser(), getClaimCreated()), 0);
+ sendMail(listener, getEvent(getExternalUser(), getClaimStatus(getClaimPending())), 0);
+ sendMail(listener, getEvent(getExternalUser(), getClaimUpdated(getClaimModification(getCommentModification()))), 0);
+ }
+
+ @Test
+ public void testInternalUser() throws Exception {
+ //change типа created пропускается
+ sendMail(listener, getEvent(getInternalUser(), getClaimCreated()), 0);
+
+ sendMail(listener, getEvent(getInternalUser(), getClaimStatus(getClaimAccepted())), 1);
+ sendMail(listener, getEvent(getInternalUser(), getClaimStatus(getClaimDenied())), 1);
+ sendMail(listener, getEvent(getInternalUser(), getClaimStatus(getClaimPending())), 1);
+
+ //modification типа НЕ comment пропускается
+ sendMail(listener, getEvent(getInternalUser(), getClaimUpdated(getClaimModification(getNotCommentModification()))), 0);
+
+ sendMail(listener, getEvent(getInternalUser(), getClaimUpdated(getClaimModification(getCommentModification()))), 1);
+
+ Change claimUpdated = getClaimUpdated(
+ getClaimModification(getCommentModification()),
+ getClaimModification(getNotCommentModification()),
+ getClaimModification(getCommentModification())
+ );
+
+ sendMail(listener, getEvent(getInternalUser(), claimUpdated), 2);
+ }
+
+ private void sendMail(ClaimEventSinkListener listener, Event event, int timesSending) throws TException {
+ reset(retryableSenderService);
+ listener.handle(event, () -> {
+ });
+ verify(retryableSenderService, times(timesSending)).sendToMail(any());
+ }
+
+ private Event getEvent(UserType userType, Change change) {
+ Event event = random(Event.class, "change", "user_info");
+ event.setUserInfo(getUserInfo(userType));
+ event.setChange(change);
+ return event;
+ }
+
+ private Change getClaimCreated() {
+ ClaimCreated changed = random(ClaimCreated.class, "changeset");
+ changed.setChangeset(List.of(getClaimModification(getCommentModification())));
+ return Change.created(changed);
+ }
+
+ private Change getClaimUpdated(Modification... modifications) {
+ ClaimUpdated changed = random(ClaimUpdated.class, "changeset");
+ changed.setChangeset(List.of(modifications));
+ return Change.updated(changed);
+ }
+
+ private Change getClaimStatus(ClaimStatus claimStatus) {
+ ClaimStatusChanged changed = random(ClaimStatusChanged.class, "status");
+ changed.setStatus(claimStatus);
+ return Change.status_changed(changed);
+ }
+
+ private Claim getClaim(String email) {
+ ModificationUnit modificationUnit = getModificationUnit(getExternalUser(), getCommentModification());
+ modificationUnit.getUserInfo().setEmail(email);
+ List changeset = List.of(
+ getModificationUnit(getInternalUser(), getCommentModification()),
+ modificationUnit,
+ getModificationUnit(getInternalUser(), getCommentModification()),
+ getModificationUnit(getInternalUser(), getStatusModification(getClaimPending())),
+ getModificationUnit(getExternalUser(), getNotCommentModification())
+ );
+ return getClaim(getClaimPending(), changeset);
+ }
+
+ private Conversation getConversation() {
+ List messages = randomListOf(5, Message.class);
+
+ Conversation conversation = random(Conversation.class, "messages");
+ conversation.setMessages(messages);
+ return conversation;
+ }
+
+ private Claim getClaim(ClaimStatus status, List changeset) {
+ Claim claim = random(Claim.class, "status", "changeset", "metadata");
+ claim.setStatus(status);
+ claim.setChangeset(changeset);
+ return claim;
+ }
+
+ private ClaimModification getCommentModification() {
+ CommentModificationUnit modificationUnit = new CommentModificationUnit();
+ modificationUnit.setId("asd");
+ modificationUnit.setModification(CommentModification.creation(new CommentCreated()));
+ return ClaimModification.comment_modification(modificationUnit);
+ }
+
+ private ClaimModification getStatusModification(ClaimStatus claimStatus) {
+ StatusModificationUnit modificationUnit = new StatusModificationUnit();
+ modificationUnit.setStatus(claimStatus);
+ modificationUnit.setModification(StatusModification.change(new StatusChanged()));
+ return ClaimModification.status_modification(modificationUnit);
+ }
+
+ private ClaimModification getNotCommentModification() {
+ FileModificationUnit modificationUnit = new FileModificationUnit();
+ modificationUnit.setId("asd");
+ modificationUnit.setModification(FileModification.creation(new FileCreated()));
+ return ClaimModification.file_modification(modificationUnit);
+ }
+
+ private ModificationUnit getModificationUnit(UserType userType, ClaimModification claimModification) {
+ ModificationUnit modification = random(ModificationUnit.class, "modification", "user_info");
+ modification.setUserInfo(getUserInfo(userType));
+ modification.setModification(getClaimModification(claimModification));
+ return modification;
+ }
+
+ private UserInfo getUserInfo(UserType userType) {
+ UserInfo userInfo = random(UserInfo.class, "type");
+ userInfo.setType(userType);
+ return userInfo;
+ }
+
+ private Modification getClaimModification(ClaimModification modification) {
+ return Modification.claim_modification(modification);
+ }
+
+ private ClaimStatus getClaimPending() {
+ return ClaimStatus.pending(new ClaimPending());
+ }
+
+ private ClaimStatus getClaimAccepted() {
+ return ClaimStatus.accepted(new ClaimAccepted());
+ }
+
+ private ClaimStatus getClaimDenied() {
+ return ClaimStatus.denied(new ClaimDenied());
+ }
+
+ private UserType getExternalUser() {
+ return UserType.external_user(new ExternalUser());
+ }
+
+ private UserType getInternalUser() {
+ return UserType.internal_user(new InternalUser());
+ }
+}
diff --git a/src/test/java/com/rbkmoney/cm/dudoser/service/MailSenderTest.java b/src/test/java/com/rbkmoney/cm/dudoser/service/MailSenderTest.java
new file mode 100644
index 0000000..0ee00c6
--- /dev/null
+++ b/src/test/java/com/rbkmoney/cm/dudoser/service/MailSenderTest.java
@@ -0,0 +1,141 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.icegreen.greenmail.util.GreenMail;
+import com.icegreen.greenmail.util.ServerSetup;
+import com.rbkmoney.cm.dudoser.CMDudoserApplication;
+import com.rbkmoney.cm.dudoser.domain.Message;
+import com.rbkmoney.cm.dudoser.exception.MailSendException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.mail.javamail.JavaMailSenderImpl;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.mail.BodyPart;
+import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
+@TestPropertySource(properties = {
+ "mail.host=localhost",
+ "mail.port=2525",
+ "mail.username=username",
+ "mail.password=secret",
+})
+public class MailSenderTest {
+
+ // отправляем сообщение на фейковый почтовой сервер
+ @Rule
+ public SmtpServerRule smtpServerRule = new SmtpServerRule(2525);
+
+ @Autowired
+ private MailSenderService mailSenderService;
+
+ @Test
+ public void sendingMessageFlowTest() throws Exception {
+ Message message = createMessage();
+
+ mailSenderService.send(message);
+
+ MimeMessage[] receivedMessages = smtpServerRule.getMessages();
+
+ assertEquals(1, receivedMessages.length);
+
+ MimeMessage receivedMessage = receivedMessages[0];
+
+ assertEquals(message.getSubject(), receivedMessage.getSubject());
+ assertEquals(message.getTo(), receivedMessage.getAllRecipients()[0].toString());
+ assertTrue(getTextFromMimeMultipart((MimeMultipart) receivedMessage.getContent()).contains(message.getContent()));
+ }
+
+ @Test(expected = MailSendException.class)
+ public void incorrectAddressTest() {
+ Message message = createMessage();
+ message.setTo("asd, asd");
+
+ mailSenderService.send(message);
+ }
+
+ @Test(expected = MailSendException.class)
+ public void connectionRefusedTest() {
+ JavaMailSenderImpl sender = getJavaMailSender(2524);
+
+ Message message = createMessage();
+
+ MailSenderService senderService = new MailSenderService(sender);
+ senderService.send(message);
+ }
+
+ private String getTextFromMimeMultipart(MimeMultipart mimeMultipart) throws Exception {
+ String result = "";
+ int partCount = mimeMultipart.getCount();
+ for (int i = 0; i < partCount; i++) {
+ BodyPart bodyPart = mimeMultipart.getBodyPart(i);
+ if (bodyPart.isMimeType("text/plain")) {
+ result = result + "\n" + bodyPart.getContent();
+ break; // without break same text appears twice in my tests
+ } else if (bodyPart.isMimeType("text/html")) {
+ String html = (String) bodyPart.getContent();
+ // result = result + "\n" + org.jsoup.Jsoup.parse(html).text();
+ result = html;
+ } else if (bodyPart.getContent() instanceof MimeMultipart) {
+ result = result + getTextFromMimeMultipart((MimeMultipart) bodyPart.getContent());
+ }
+ }
+ return result;
+ }
+
+ private Message createMessage() {
+ return Message.builder()
+ .from("no-reply@rbk.com")
+ .to("info@rbk.com")
+ .subject("Spring Mail Integration Testing with JUnit and GreenMail Example")
+ .content("We show how to write Integration Tests using Spring and GreenMail.")
+ .build();
+ }
+
+ private JavaMailSenderImpl getJavaMailSender(int port) {
+ JavaMailSenderImpl sender = new JavaMailSenderImpl();
+ sender.setHost("localhost");
+ sender.setPort(port);
+ sender.setUsername("username");
+ sender.setPassword("secret");
+ return sender;
+ }
+
+ public static class SmtpServerRule extends ExternalResource {
+
+ private GreenMail smtpServer;
+ private int port;
+
+ public SmtpServerRule(int port) {
+ this.port = port;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ super.before();
+ smtpServer = new GreenMail(new ServerSetup(port, null, "smtp"));
+ smtpServer.start();
+ }
+
+ public MimeMessage[] getMessages() {
+ return smtpServer.getReceivedMessages();
+ }
+
+ @Override
+ protected void after() {
+ super.after();
+ smtpServer.stop();
+ }
+ }
+}
diff --git a/src/test/java/com/rbkmoney/cm/dudoser/service/RetryableSenderTest.java b/src/test/java/com/rbkmoney/cm/dudoser/service/RetryableSenderTest.java
new file mode 100644
index 0000000..b6643cc
--- /dev/null
+++ b/src/test/java/com/rbkmoney/cm/dudoser/service/RetryableSenderTest.java
@@ -0,0 +1,52 @@
+package com.rbkmoney.cm.dudoser.service;
+
+import com.rbkmoney.cm.dudoser.CMDudoserApplication;
+import com.rbkmoney.cm.dudoser.domain.Message;
+import com.rbkmoney.cm.dudoser.exception.MailSendException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.*;
+import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = CMDudoserApplication.class, webEnvironment = RANDOM_PORT)
+public class RetryableSenderTest {
+
+ @MockBean
+ private MailSenderService mailSenderService;
+
+ @Autowired
+ private RetryableSenderService retryableSenderService;
+
+ @Value("${mail.retry.max.attempts:3}")
+ private int maxAttempts;
+
+ @Test
+ public void test() {
+ AtomicInteger atomicInt = new AtomicInteger(0);
+ // на countRetries попытке мок корректно обработает вызов, в предыдущих попытках будет вынуждать сервис ретраить обработку ивента
+ when(mailSenderService.send(any())).thenAnswer(
+ invocation -> {
+ int increment = atomicInt.getAndIncrement();
+ if (increment < maxAttempts - 1) {
+ throw new MailSendException();
+ }
+ return true;
+ }
+ );
+
+ try {
+ retryableSenderService.sendToMail(Message.builder().build());
+ } finally {
+ verify(mailSenderService, times(maxAttempts)).send(any());
+ }
+ }
+}
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..be96068
--- /dev/null
+++ b/src/test/resources/logback-test.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+