From c5208c5a235b693ff7324619cd45b6b815cf7ecf Mon Sep 17 00:00:00 2001 From: Pospolita Nikita Date: Thu, 13 Jun 2019 19:22:14 +0300 Subject: [PATCH] =?UTF-8?q?Ka=D1=84ka=20consumer=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * commit for CI * updated libs, refactored for lombok * CI fix * added build_utils * added port * added sonar coverage, fixed sonar bugs, refactored error-extraction * fixed error conversion logic * Refactored for Kafka * tests fixes * added Kafka tests * fix review * fix review * fix review * fix review * fix review * fix review * concurency = 5 * Added check duplicates * Added constraint columns * Fixed pom * Kafka application properties config (#74) * fix review * test repaired * test repaired * kafka-common-lib added * review fix * Added update of old records * Updated kafka stuff * Fixed condition for updateIfExists --- build_utils | 2 +- pom.xml | 40 +++++- .../configuration/EventStockPollerConfig.java | 12 +- .../hooker/configuration/KafkaConfig.java | 131 ++++++++++++++++++ .../hooker/configuration/RetryConfig.java | 29 ++++ .../properties/KafkaSslProperties.java | 23 +++ .../hooker/converter/SourceEventParser.java | 28 ++++ .../hooker/dao/InvoicingMessageDao.java | 1 + .../hooker/dao/impl/CustomerDaoImpl.java | 10 +- .../dao/impl/InvoicingMessageDaoImpl.java | 41 +++++- .../hooker/endpoint/HookerServlet.java | 1 - .../hooker/exception/ParseException.java | 23 +++ .../com/rbkmoney/hooker/handler/Handler.java | 4 +- .../handler/poller/EventStockHandler.java | 5 +- .../AbstractCustomerEventHandler.java | 12 +- .../CustomerBindingFailedHandler.java | 4 +- .../CustomerBindingStartedHandler.java | 4 +- .../CustomerBindingSucceededHandler.java | 4 +- .../impl/customer/CustomerCreatedHandler.java | 9 +- .../impl/customer/CustomerDeletedHandler.java | 6 +- .../impl/customer/CustomerReadyHandler.java | 4 +- .../NeedReadCustomerEventHandler.java | 19 +-- .../AbstractInvoiceEventHandler.java | 11 +- .../impl/invoicing/InvoiceCreatedHandler.java | 13 +- .../InvoicePaymentRefundStartedHandler.java | 4 +- ...oicePaymentRefundStatusChangedHandler.java | 4 +- .../InvoicePaymentStartedHandler.java | 6 +- .../InvoicePaymentStatusChangedHandler.java | 4 +- .../InvoiceStatusChangedHandler.java | 4 +- .../NeedReadInvoiceEventHandler.java | 23 +-- .../listener/KafkaMachineEventListener.java | 24 ++++ .../hooker/listener/MachineEventHandler.java | 10 ++ .../listener/MachineEventHandlerImpl.java | 59 ++++++++ .../hooker/model/CustomerMessage.java | 4 +- .../hooker/model/InvoicingMessage.java | 6 +- .../hooker/serde/SinkEventDeserializer.java | 15 ++ .../hooker/service/HandlerManager.java | 19 +++ .../rbkmoney/hooker/service/SleepService.java | 20 +++ src/main/resources/application.yaml | 27 +++- src/main/resources/cert/truststore.p12 | Bin 0 -> 1114 bytes .../migration/V12__add_constraint_columns.sql | 5 + .../hooker/AbstractIntegrationTest.java | 20 ++- .../rbkmoney/hooker/CustomerDataflowTest.java | 6 +- .../dao/CustomerMessageDaoImplTest.java | 1 - .../dao/InvoicingMessageDaoImplTest.java | 13 +- .../KafkaMachineEventListenerKafkaTest.java | 98 +++++++++++++ .../listener/MachineEventHandlerImplTest.java | 87 ++++++++++++ .../com/rbkmoney/hooker/utils/BuildUtils.java | 2 +- 48 files changed, 800 insertions(+), 97 deletions(-) create mode 100644 src/main/java/com/rbkmoney/hooker/configuration/KafkaConfig.java create mode 100644 src/main/java/com/rbkmoney/hooker/configuration/RetryConfig.java create mode 100644 src/main/java/com/rbkmoney/hooker/configuration/properties/KafkaSslProperties.java create mode 100644 src/main/java/com/rbkmoney/hooker/converter/SourceEventParser.java create mode 100644 src/main/java/com/rbkmoney/hooker/exception/ParseException.java create mode 100644 src/main/java/com/rbkmoney/hooker/listener/KafkaMachineEventListener.java create mode 100644 src/main/java/com/rbkmoney/hooker/listener/MachineEventHandler.java create mode 100644 src/main/java/com/rbkmoney/hooker/listener/MachineEventHandlerImpl.java create mode 100644 src/main/java/com/rbkmoney/hooker/serde/SinkEventDeserializer.java create mode 100644 src/main/java/com/rbkmoney/hooker/service/HandlerManager.java create mode 100644 src/main/java/com/rbkmoney/hooker/service/SleepService.java create mode 100644 src/main/resources/cert/truststore.p12 create mode 100644 src/main/resources/db/migration/V12__add_constraint_columns.sql create mode 100644 src/test/java/com/rbkmoney/hooker/kafka/KafkaMachineEventListenerKafkaTest.java create mode 100644 src/test/java/com/rbkmoney/hooker/listener/MachineEventHandlerImplTest.java diff --git a/build_utils b/build_utils index 870b70a..ea4aa04 160000 --- a/build_utils +++ b/build_utils @@ -1 +1 @@ -Subproject commit 870b70a63af18fc7a02c9ff26b06132d2b1993cb +Subproject commit ea4aa042f482551d624fd49a570d28488f479e93 diff --git a/pom.xml b/pom.xml index 150aec3..c71172b 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 hooker - 2.0.25-SNAPSHOT + 2.0.26-SNAPSHOT jar hooker @@ -36,6 +36,8 @@ postgres postgres hook + + 0.6.9 @@ -92,10 +94,25 @@ + + com.rbkmoney.woody + woody-thrift + 1.1.15 + + + com.rbkmoney.geck + common + ${geck.version} + + + com.rbkmoney.geck + filter + ${geck.version} + org.springframework.kafka spring-kafka - 2.2.4.RELEASE + 2.2.5.RELEASE org.springframework.boot @@ -135,6 +152,11 @@ eventstock-client 1.2.1 + + com.rbkmoney + kafka-common-lib + 0.0.5 + com.rbkmoney damsel @@ -213,6 +235,11 @@ 2.1.0 test + + com.rbkmoney + machinegun-proto + 1.12-ebae56f + @@ -245,6 +272,15 @@ org.apache.maven.plugins maven-dependency-plugin + + org.apache.maven.plugins + maven-resources-plugin + + + p12 + + + org.apache.maven.plugins maven-remote-resources-plugin diff --git a/src/main/java/com/rbkmoney/hooker/configuration/EventStockPollerConfig.java b/src/main/java/com/rbkmoney/hooker/configuration/EventStockPollerConfig.java index f36907e..ddf751c 100644 --- a/src/main/java/com/rbkmoney/hooker/configuration/EventStockPollerConfig.java +++ b/src/main/java/com/rbkmoney/hooker/configuration/EventStockPollerConfig.java @@ -49,12 +49,12 @@ public class EventStockPollerConfig { List eventPublishers = new ArrayList<>(); for (int i = 0; i < workersCount; ++i) { eventPublishers.add(new PollingEventPublisherBuilder() - .withURI(bmUri.getURI()) - .withEventHandler(eventStockHandlers.get(i)) - .withMaxPoolSize(maxPoolSize) - .withPollDelay(pollDelay) - .withMaxQuerySize(maxQuerySize) - .build()); + .withURI(bmUri.getURI()) + .withEventHandler(eventStockHandlers.get(i)) + .withMaxPoolSize(maxPoolSize) + .withPollDelay(pollDelay) + .withMaxQuerySize(maxQuerySize) + .build()); } return eventPublishers; } diff --git a/src/main/java/com/rbkmoney/hooker/configuration/KafkaConfig.java b/src/main/java/com/rbkmoney/hooker/configuration/KafkaConfig.java new file mode 100644 index 0000000..918f634 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/configuration/KafkaConfig.java @@ -0,0 +1,131 @@ +package com.rbkmoney.hooker.configuration; + +import com.rbkmoney.kafka.common.retry.ConfigurableRetryPolicy; +import com.rbkmoney.machinegun.eventsink.MachineEvent; +import com.rbkmoney.hooker.configuration.properties.KafkaSslProperties; +import com.rbkmoney.hooker.serde.SinkEventDeserializer; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.support.RetryTemplate; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +@Configuration +@EnableConfigurationProperties(KafkaSslProperties.class) +@RequiredArgsConstructor +public class KafkaConfig { + + @Value("${kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + @Value("${kafka.consumer.enable-auto-commit}") + private boolean enableAutoCommit; + @Value("${kafka.consumer.group-id}") + private String groupId; + @Value("${kafka.client-id}") + private String clientId; + @Value("${kafka.consumer.max-poll-records}") + private int maxPollRecords; + + @Value("${kafka.bootstrap-servers}") + private String bootstrapServers; + @Value("${kafka.consumer.concurrency}") + private int concurrency; + + private final KafkaSslProperties kafkaSslProperties; + + @Value("${retry-policy.maxAttempts}") + int maxAttempts; + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SinkEventDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + + configureSsl(props); + + return props; + } + + private void configureSsl(Map props) { + if (kafkaSslProperties.isEnabled()) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name()); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath()); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword()); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType()); + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType()); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath()); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword()); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword()); + } + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory( + ConsumerFactory consumerFactory, + RetryTemplate kafkaRetryTemplate + ) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.getContainerProperties().setAckOnError(false); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + factory.setErrorHandler(kafkaErrorHandler()); + factory.setConcurrency(concurrency); + factory.setRetryTemplate(kafkaRetryTemplate); + return factory; + } + + private ErrorHandler kafkaErrorHandler() { + return (thrownException, data) -> { + if (data != null) { + log.error("Error while processing: data-key: {}, data-offset: {}, data-partition: {}", + data.key(), data.offset(), data.partition(), thrownException); + } else { + log.error("Error while processing", thrownException); + } + }; + } + + @Bean + public RetryTemplate kafkaRetryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy( + new ConfigurableRetryPolicy(maxAttempts, Collections.singletonMap(RuntimeException.class, true)) + ); + retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy()); + + return retryTemplate; + } +} diff --git a/src/main/java/com/rbkmoney/hooker/configuration/RetryConfig.java b/src/main/java/com/rbkmoney/hooker/configuration/RetryConfig.java new file mode 100644 index 0000000..0fbbebf --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/configuration/RetryConfig.java @@ -0,0 +1,29 @@ +package com.rbkmoney.hooker.configuration; + +import com.rbkmoney.kafka.common.retry.ConfigurableRetryPolicy; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.support.RetryTemplate; + +import java.util.Collections; + +@Configuration +public class RetryConfig { + + @Value("${retry-policy.maxAttempts}") + int maxAttempts; + + @Bean + RetryTemplate retryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy( + new ConfigurableRetryPolicy(maxAttempts, Collections.singletonMap(RuntimeException.class, true)) + ); + retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy()); + + return retryTemplate; + } + +} diff --git a/src/main/java/com/rbkmoney/hooker/configuration/properties/KafkaSslProperties.java b/src/main/java/com/rbkmoney/hooker/configuration/properties/KafkaSslProperties.java new file mode 100644 index 0000000..edc2fdf --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/configuration/properties/KafkaSslProperties.java @@ -0,0 +1,23 @@ +package com.rbkmoney.hooker.configuration.properties; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Getter +@Setter +@Component +@ConfigurationProperties(prefix = "kafka.ssl") +public class KafkaSslProperties { + + private String trustStorePassword; + private String trustStoreLocation; + private String keyStorePassword; + private String keyPassword; + private String keyStoreLocation; + private boolean enabled; + private String keyStoreType; + private String trustStoreType; + +} diff --git a/src/main/java/com/rbkmoney/hooker/converter/SourceEventParser.java b/src/main/java/com/rbkmoney/hooker/converter/SourceEventParser.java new file mode 100644 index 0000000..9950a54 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/converter/SourceEventParser.java @@ -0,0 +1,28 @@ +package com.rbkmoney.hooker.converter; + +import com.rbkmoney.damsel.payment_processing.EventPayload; +import com.rbkmoney.hooker.exception.ParseException; +import com.rbkmoney.kafka.common.converter.BinaryConverter; +import com.rbkmoney.kafka.common.converter.BinaryConverterImpl; +import com.rbkmoney.machinegun.eventsink.MachineEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SourceEventParser { + + private final BinaryConverter converter = new BinaryConverterImpl(); + + public EventPayload parseEvent(MachineEvent message) { + try { + byte[] bin = message.getData().getBin(); + return converter.convert(bin, EventPayload.class); + } catch (Exception e) { + log.error("Exception when parse message e: ", e); + throw new ParseException(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/hooker/dao/InvoicingMessageDao.java b/src/main/java/com/rbkmoney/hooker/dao/InvoicingMessageDao.java index 1f3c393..9472ddb 100644 --- a/src/main/java/com/rbkmoney/hooker/dao/InvoicingMessageDao.java +++ b/src/main/java/com/rbkmoney/hooker/dao/InvoicingMessageDao.java @@ -6,4 +6,5 @@ public interface InvoicingMessageDao extends MessageDao { InvoicingMessage getInvoice(String invoiceId) throws DaoException; InvoicingMessage getPayment(String invoiceId, String paymentId) throws DaoException; InvoicingMessage getRefund(String invoiceId, String paymentId, String refundId) throws DaoException; + boolean updateIfExists(InvoicingMessage message); } diff --git a/src/main/java/com/rbkmoney/hooker/dao/impl/CustomerDaoImpl.java b/src/main/java/com/rbkmoney/hooker/dao/impl/CustomerDaoImpl.java index b307f27..079aa2a 100644 --- a/src/main/java/com/rbkmoney/hooker/dao/impl/CustomerDaoImpl.java +++ b/src/main/java/com/rbkmoney/hooker/dao/impl/CustomerDaoImpl.java @@ -40,6 +40,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus public static final String EVENT_ID = "event_id"; public static final String TYPE = "type"; public static final String OCCURED_AT = "occured_at"; + public static final String SEQUENCE_ID = "sequence_id"; + public static final String CHANGE_ID = "change_id"; public static final String PARTY_ID = "party_id"; public static final String EVENT_TYPE = "event_type"; public static final String CUSTOMER_ID = "customer_id"; @@ -99,6 +101,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus message.setEventId(rs.getLong(EVENT_ID)); message.setPartyId(rs.getString(PARTY_ID)); message.setOccuredAt(rs.getString(OCCURED_AT)); + message.setSequenceId(rs.getLong(SEQUENCE_ID)); + message.setChangeId(rs.getInt(CHANGE_ID)); message.setType(rs.getString(TYPE)); message.setEventType(EventType.valueOf(rs.getString(EVENT_TYPE))); message.setCustomer(new Customer() @@ -151,14 +155,14 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus @Transactional public void create(CustomerMessage message) throws DaoException { final String sql = "INSERT INTO hook.customer_message " + - "(event_id, occured_at, type, party_id, event_type, " + + "(event_id, occured_at, sequence_id, change_id, type, party_id, event_type, " + "customer_id, customer_shop_id, customer_status, customer_email , customer_phone, customer_metadata, " + "binding_id, binding_payment_tool_token, binding_payment_session, binding_payment_tool_details_type, " + "binding_payment_card_bin, binding_payment_card_last_digits, binding_payment_card_number_mask, binding_payment_card_token_provider, binding_payment_card_system, binding_payment_terminal_provider, " + "binding_payment_digital_wallet_provider, binding_payment_digital_wallet_id, binding_payment_crypto_currency, " + "binding_client_ip, binding_client_fingerprint, binding_status, binding_error_code, binding_error_message) " + "VALUES " + - "(:event_id, :occured_at, CAST(:type as hook.customer_message_type), :party_id, CAST(:event_type as hook.eventtype), " + + "(:event_id, :occured_at, :sequence_id, :change_id, CAST(:type as hook.customer_message_type), :party_id, CAST(:event_type as hook.eventtype), " + ":customer_id, :customer_shop_id, CAST(:customer_status as hook.customer_status), :customer_email , :customer_phone, :customer_metadata, " + ":binding_id, :binding_payment_tool_token, :binding_payment_session, CAST(:binding_payment_tool_details_type as hook.payment_tool_details_type), " + ":binding_payment_card_bin, :binding_payment_card_last_digits, :binding_payment_card_number_mask, :binding_payment_card_token_provider, :binding_payment_card_system, :binding_payment_terminal_provider, " + @@ -169,6 +173,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus MapSqlParameterSource params = new MapSqlParameterSource() .addValue(EVENT_ID, message.getEventId()) .addValue(OCCURED_AT, message.getOccuredAt()) + .addValue(SEQUENCE_ID, message.getSequenceId()) + .addValue(CHANGE_ID, message.getChangeId()) .addValue(TYPE, message.getType()) .addValue(PARTY_ID, message.getPartyId()) .addValue(EVENT_TYPE, message.getEventType().name()) diff --git a/src/main/java/com/rbkmoney/hooker/dao/impl/InvoicingMessageDaoImpl.java b/src/main/java/com/rbkmoney/hooker/dao/impl/InvoicingMessageDaoImpl.java index ec08d9e..6e69623 100644 --- a/src/main/java/com/rbkmoney/hooker/dao/impl/InvoicingMessageDaoImpl.java +++ b/src/main/java/com/rbkmoney/hooker/dao/impl/InvoicingMessageDaoImpl.java @@ -42,6 +42,8 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem public static final String ID = "id"; public static final String EVENT_ID = "event_id"; public static final String EVENT_TIME = "event_time"; + public static final String SEQUENCE_ID = "sequence_id"; + public static final String CHANGE_ID = "change_id"; public static final String TYPE = "type"; public static final String PARTY_ID = "party_id"; public static final String EVENT_TYPE = "event_type"; @@ -154,6 +156,8 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem message.setId(rs.getLong(ID)); message.setEventId(rs.getLong(EVENT_ID)); message.setEventTime(rs.getString(EVENT_TIME)); + message.setSequenceId(rs.getLong(SEQUENCE_ID)); + message.setChangeId(rs.getInt(CHANGE_ID)); message.setType(rs.getString(TYPE)); message.setPartyId(rs.getString(PARTY_ID)); message.setEventType(EventType.valueOf(rs.getString(EVENT_TYPE))); @@ -306,7 +310,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem @Transactional public void create(InvoicingMessage message) throws DaoException { final String sql = "INSERT INTO hook.message" + - "(event_id, event_time, type, party_id, event_type, " + + "(event_id, event_time, sequence_id, change_id, type, party_id, event_type, " + "invoice_id, shop_id, invoice_created_at, invoice_status, invoice_reason, invoice_due_date, invoice_amount, " + "invoice_currency, invoice_content_type, invoice_content_data, invoice_product, invoice_description, " + "payment_id, payment_created_at, payment_status, payment_failure, payment_failure_reason, payment_amount, " + @@ -315,7 +319,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem "payment_digital_wallet_provider, payment_digital_wallet_id, payment_crypto_currency, " + "refund_id, refund_created_at, refund_status, refund_failure, refund_failure_reason, refund_amount, refund_currency, refund_reason) " + "VALUES " + - "(:event_id, :event_time, :type, :party_id, CAST(:event_type as hook.eventtype), " + + "(:event_id, :event_time, :sequence_id, :change_id, :type, :party_id, CAST(:event_type as hook.eventtype), " + ":invoice_id, :shop_id, :invoice_created_at, :invoice_status, :invoice_reason, :invoice_due_date, :invoice_amount, " + ":invoice_currency, :invoice_content_type, :invoice_content_data, :invoice_product, :invoice_description, " + ":payment_id, :payment_created_at, :payment_status, :payment_failure, :payment_failure_reason, :payment_amount, " + @@ -327,6 +331,8 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem MapSqlParameterSource params = new MapSqlParameterSource() .addValue(EVENT_ID, message.getEventId()) .addValue(EVENT_TIME, message.getEventTime()) + .addValue(SEQUENCE_ID, message.getSequenceId()) + .addValue(CHANGE_ID, message.getChangeId()) .addValue(TYPE, message.getType()) .addValue(PARTY_ID, message.getPartyId()) .addValue(EVENT_TYPE, message.getEventType().toString()) @@ -451,4 +457,35 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem public InvoicingMessage getRefund(String invoiceId, String paymentId, String refundId) throws DaoException { return getAny(invoiceId, paymentId, refundId, REFUND); } + + @Override + public boolean updateIfExists(InvoicingMessage message) { + String sql = "WITH sub AS (SELECT id FROM hook.message WHERE invoice_id=:invoice_id" + + " AND type=:type" + + " AND event_type=CAST(:event_type as hook.eventtype)" + + " AND invoice_status=:invoice_status" + + " AND (payment_id IS NULL OR payment_id=:payment_id)" + + " AND (payment_status IS NULL OR payment_status=:payment_status)" + + " AND (refund_id IS NULL OR refund_id=:refund_id)" + + " AND (refund_status IS NULL OR refund_status=:refund_status) LIMIT 1) " + + " UPDATE hook.message m SET sequence_id =:sequence_id, change_id =:change_id " + + " FROM sub " + + " WHERE m.id = sub.id"; + MapSqlParameterSource params = new MapSqlParameterSource(INVOICE_ID, message.getInvoice().getId()) + .addValue(TYPE, message.getType()) + .addValue(EVENT_TYPE, message.getEventType().toString()) + .addValue(INVOICE_STATUS, message.getInvoice().getStatus()) + .addValue(PAYMENT_ID, message.getPayment() != null ? message.getPayment().getId() : null) + .addValue(PAYMENT_STATUS, message.getPayment() != null ? message.getPayment().getStatus() : null) + .addValue(REFUND_ID, message.getRefund() != null ? message.getRefund().getId() : null) + .addValue(REFUND_STATUS, message.getRefund() != null ? message.getRefund().getStatus() : null) + .addValue(SEQUENCE_ID, message.getSequenceId()) + .addValue(CHANGE_ID, message.getChangeId()); + try { + int count = getNamedParameterJdbcTemplate().update(sql, params); + return count > 0; + } catch (NestedRuntimeException e) { + throw new DaoException("InvoicingMessageDaoImpl.updateIfExists error", e); + } + } } diff --git a/src/main/java/com/rbkmoney/hooker/endpoint/HookerServlet.java b/src/main/java/com/rbkmoney/hooker/endpoint/HookerServlet.java index bdff4a8..ce9f615 100644 --- a/src/main/java/com/rbkmoney/hooker/endpoint/HookerServlet.java +++ b/src/main/java/com/rbkmoney/hooker/endpoint/HookerServlet.java @@ -7,7 +7,6 @@ import com.rbkmoney.woody.thrift.impl.http.event.HttpServiceEventLogListener; import com.rbkmoney.woody.thrift.impl.http.event.ServiceEventLogListener; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; - import javax.servlet.*; import javax.servlet.annotation.WebServlet; import java.io.IOException; diff --git a/src/main/java/com/rbkmoney/hooker/exception/ParseException.java b/src/main/java/com/rbkmoney/hooker/exception/ParseException.java new file mode 100644 index 0000000..7cf1610 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/exception/ParseException.java @@ -0,0 +1,23 @@ +package com.rbkmoney.hooker.exception; + +public class ParseException extends RuntimeException { + + public ParseException() { + } + + public ParseException(String message) { + super(message); + } + + public ParseException(String message, Throwable cause) { + super(message, cause); + } + + public ParseException(Throwable cause) { + super(cause); + } + + public ParseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/hooker/handler/Handler.java b/src/main/java/com/rbkmoney/hooker/handler/Handler.java index ec88460..49c0396 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/Handler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/Handler.java @@ -5,10 +5,10 @@ import com.rbkmoney.geck.filter.Filter; /** * Created by inal on 24.11.2016. */ -public interface Handler { +public interface Handler { default boolean accept(C change) { return getFilter().match(change); } - void handle(C change, P parent); + void handle(C change, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId); Filter getFilter(); } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/EventStockHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/EventStockHandler.java index 7bae651..b723ba5 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/EventStockHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/EventStockHandler.java @@ -59,12 +59,13 @@ public class EventStockHandler implements EventHandler { long id = processingEvent.getId(); - for (Object cc : changes) { + for (int i = 0; i < changes.size(); ++i) { + Object cc = changes.get(i); for (Handler pollingEventHandler : pollingEventHandlers) { if (pollingEventHandler.accept(cc)) { try { log.info("We got an event {}", new TBaseProcessor().process(stockEvent, JsonHandler.newPrettyJsonInstance())); - pollingEventHandler.handle(cc, stockEvent); + pollingEventHandler.handle(cc, stockEvent.getId(), stockEvent.getTime(), sourceId, (long) processingEvent.getSequence(), i); } catch (DaoException e) { log.error("DaoException when poller handling with eventId {}", id, e); if (count.decrementAndGet() > 0) { diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/AbstractCustomerEventHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/AbstractCustomerEventHandler.java index 5131ee9..a74756e 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/AbstractCustomerEventHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/AbstractCustomerEventHandler.java @@ -1,24 +1,22 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; -import com.rbkmoney.damsel.event_stock.StockEvent; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.hooker.dao.DaoException; import com.rbkmoney.hooker.handler.Handler; +import com.rbkmoney.machinegun.eventsink.MachineEvent; /** * Created by inalarsanukaev on 07.04.17. */ -public abstract class AbstractCustomerEventHandler implements Handler { +public abstract class AbstractCustomerEventHandler implements Handler { public static final String CUSTOMER = "customer"; public static final String BINDING = "binding"; @Override - public void handle(CustomerChange c, StockEvent value) throws DaoException{ - Event event = value.getSourceEvent().getProcessingEvent(); - saveEvent(c, event); + public void handle(CustomerChange c, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException{ + saveEvent(c, eventId, eventCreatedAt, sourceId, sequenceId, changeId); } - protected abstract void saveEvent(CustomerChange cc, Event event) throws DaoException; + protected abstract void saveEvent(CustomerChange cc, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException; } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingFailedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingFailedHandler.java index 74dcbb3..629b988 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingFailedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingFailedHandler.java @@ -3,13 +3,13 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.domain.Failure; import com.rbkmoney.damsel.domain.OperationFailure; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; import com.rbkmoney.geck.filter.condition.IsNullCondition; import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.CustomerMessage; import com.rbkmoney.hooker.model.EventType; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import com.rbkmoney.swag_webhook_events.CustomerBindingError; import org.springframework.stereotype.Component; @@ -41,7 +41,7 @@ public class CustomerBindingFailedHandler extends NeedReadCustomerEventHandler { } @Override - protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) { + protected void modifyMessage(CustomerChange cc, CustomerMessage message) { OperationFailure failure = cc.getCustomerBindingChanged().getPayload().getStatusChanged().getStatus().getFailed().getFailure(); String errCode = null; String errMess = null; diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingStartedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingStartedHandler.java index c5e3d26..fe69b02 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingStartedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingStartedHandler.java @@ -1,13 +1,13 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; import com.rbkmoney.geck.filter.condition.IsNullCondition; import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.CustomerMessage; import com.rbkmoney.hooker.model.EventType; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import com.rbkmoney.swag_webhook_events.ClientInfo; import com.rbkmoney.swag_webhook_events.CustomerBinding; import com.rbkmoney.swag_webhook_events.PaymentResource; @@ -48,7 +48,7 @@ public class CustomerBindingStartedHandler extends NeedReadCustomerEventHandler } @Override - protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) { + protected void modifyMessage(CustomerChange cc, CustomerMessage message) { com.rbkmoney.damsel.payment_processing.CustomerBinding bindingOrigin = cc.getCustomerBindingChanged().getPayload().getStarted().getBinding(); PaymentResource paymentResource = new PaymentResource() .paymentSession(bindingOrigin.getPaymentResource().getPaymentSessionId()) diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingSucceededHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingSucceededHandler.java index 5cdb22c..b91aef7 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingSucceededHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerBindingSucceededHandler.java @@ -1,13 +1,13 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; import com.rbkmoney.geck.filter.condition.IsNullCondition; import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.CustomerMessage; import com.rbkmoney.hooker.model.EventType; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import com.rbkmoney.swag_webhook_events.CustomerBinding; import org.springframework.stereotype.Component; @@ -39,7 +39,7 @@ public class CustomerBindingSucceededHandler extends NeedReadCustomerEventHandle } @Override - protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) { + protected void modifyMessage(CustomerChange cc, CustomerMessage message) { message.getCustomerBinding().setStatus(CustomerBinding.StatusEnum.SUCCEEDED); } } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerCreatedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerCreatedHandler.java index 1d24cc9..7a816c9 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerCreatedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerCreatedHandler.java @@ -1,7 +1,6 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; import com.rbkmoney.geck.filter.condition.IsNullCondition; @@ -39,11 +38,13 @@ public class CustomerCreatedHandler extends AbstractCustomerEventHandler { } @Override - protected void saveEvent(CustomerChange cc, Event event) throws DaoException { + protected void saveEvent(CustomerChange cc, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException { com.rbkmoney.damsel.payment_processing.CustomerCreated customerCreatedOrigin = cc.getCustomerCreated(); CustomerMessage customerMessage = new CustomerMessage(); - customerMessage.setEventId(event.getId()); - customerMessage.setOccuredAt(event.getCreatedAt()); + customerMessage.setEventId(eventId); + customerMessage.setOccuredAt(eventCreatedAt); + customerMessage.setSequenceId(sequenceId); + customerMessage.setChangeId(changeId); customerMessage.setType(CUSTOMER); customerMessage.setPartyId(customerCreatedOrigin.getOwnerId()); customerMessage.setEventType(eventType); diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerDeletedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerDeletedHandler.java index 2f80863..c9125de 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerDeletedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerDeletedHandler.java @@ -1,13 +1,14 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; import com.rbkmoney.geck.filter.condition.IsNullCondition; import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.CustomerMessage; import com.rbkmoney.hooker.model.EventType; +import com.rbkmoney.machinegun.eventsink.MachineEvent; +import com.rbkmoney.swag_webhook_events.Customer; import org.springframework.stereotype.Component; /** @@ -38,7 +39,6 @@ public class CustomerDeletedHandler extends NeedReadCustomerEventHandler { } @Override - protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) { - + protected void modifyMessage(CustomerChange cc, CustomerMessage message) { } } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerReadyHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerReadyHandler.java index c63dab5..2e1aa98 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerReadyHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/CustomerReadyHandler.java @@ -1,13 +1,13 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; import com.rbkmoney.geck.filter.condition.IsNullCondition; import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.CustomerMessage; import com.rbkmoney.hooker.model.EventType; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import com.rbkmoney.swag_webhook_events.Customer; import org.springframework.stereotype.Component; @@ -39,7 +39,7 @@ public class CustomerReadyHandler extends NeedReadCustomerEventHandler { } @Override - protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) { + protected void modifyMessage(CustomerChange cc, CustomerMessage message) { message.getCustomer().setStatus(Customer.StatusEnum.READY); } } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/NeedReadCustomerEventHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/NeedReadCustomerEventHandler.java index 27ca1cb..1dda180 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/NeedReadCustomerEventHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/customer/NeedReadCustomerEventHandler.java @@ -1,11 +1,11 @@ package com.rbkmoney.hooker.handler.poller.impl.customer; import com.rbkmoney.damsel.payment_processing.CustomerChange; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.hooker.dao.CustomerDao; import com.rbkmoney.hooker.dao.DaoException; import com.rbkmoney.hooker.model.CustomerMessage; import com.rbkmoney.hooker.model.EventType; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import org.springframework.beans.factory.annotation.Autowired; /** @@ -17,18 +17,19 @@ public abstract class NeedReadCustomerEventHandler extends AbstractCustomerEvent CustomerDao customerDao; @Override - protected void saveEvent(CustomerChange cc, Event event) throws DaoException { - final String customerId = event.getSource().getCustomerId(); + protected void saveEvent(CustomerChange cc, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException { //getAny any saved message for related invoice - CustomerMessage message = getCustomerMessage(customerId); + CustomerMessage message = getCustomerMessage(sourceId); if (message == null) { - throw new DaoException("CustomerMessage for customer with id " + customerId + " not exist"); + throw new DaoException("CustomerMessage for customer with id " + sourceId + " not exist"); } message.setEventType(getEventType()); message.setType(getMessageType()); - message.setEventId(event.getId()); - message.setOccuredAt(event.getCreatedAt()); - modifyMessage(cc, event, message); + message.setEventId(eventId); + message.setOccuredAt(eventCreatedAt); + message.setSequenceId(sequenceId); + message.setChangeId(changeId); + modifyMessage(cc, message); customerDao.create(message); } @@ -41,5 +42,5 @@ public abstract class NeedReadCustomerEventHandler extends AbstractCustomerEvent protected abstract EventType getEventType(); - protected abstract void modifyMessage(CustomerChange cc, Event event, CustomerMessage message); + protected abstract void modifyMessage(CustomerChange cc, CustomerMessage message); } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/AbstractInvoiceEventHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/AbstractInvoiceEventHandler.java index 52a75f9..a9a06db 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/AbstractInvoiceEventHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/AbstractInvoiceEventHandler.java @@ -1,7 +1,5 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing; -import com.rbkmoney.damsel.event_stock.StockEvent; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.hooker.dao.DaoException; import com.rbkmoney.hooker.handler.Handler; @@ -9,17 +7,16 @@ import com.rbkmoney.hooker.handler.Handler; /** * Created by inalarsanukaev on 07.04.17. */ -public abstract class AbstractInvoiceEventHandler implements Handler { +public abstract class AbstractInvoiceEventHandler implements Handler { public static final String INVOICE = "invoice"; public static final String PAYMENT = "payment"; public static final String REFUND = "refund"; @Override - public void handle(InvoiceChange ic, StockEvent value) throws DaoException{ - Event event = value.getSourceEvent().getProcessingEvent(); - saveEvent(ic, event); + public void handle(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException { + saveEvent(ic, eventId, eventCreatedAt, sourceId, sequenceId, changeId); } - protected abstract void saveEvent(InvoiceChange ic, Event event) throws DaoException; + protected abstract void saveEvent(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException; } diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceCreatedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceCreatedHandler.java index 7f89ef3..2a89ca5 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceCreatedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceCreatedHandler.java @@ -4,7 +4,6 @@ import com.rbkmoney.damsel.domain.Invoice; import com.rbkmoney.damsel.domain.InvoiceCart; import com.rbkmoney.damsel.domain.InvoiceLine; import com.rbkmoney.damsel.msgpack.Value; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; @@ -35,12 +34,14 @@ public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler { @Override @Transactional - public void saveEvent(InvoiceChange ic, Event event) throws DaoException { + public void saveEvent(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException { Invoice invoiceOrigin = ic.getInvoiceCreated().getInvoice(); ////// InvoicingMessage message = new InvoicingMessage(); - message.setEventId(event.getId()); - message.setEventTime(event.getCreatedAt()); + message.setEventId(eventId); + message.setEventTime(eventCreatedAt); + message.setSequenceId(sequenceId); + message.setChangeId(changeId); message.setType(INVOICE); message.setPartyId(invoiceOrigin.getOwnerId()); message.setEventType(eventType); @@ -77,7 +78,9 @@ public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler { invoice.getCart().add(icp); } } - messageDao.create(message); + if (!messageDao.updateIfExists(message)) { + messageDao.create(message); + } } @Override diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStartedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStartedHandler.java index 49a37c9..63a24ed 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStartedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStartedHandler.java @@ -2,7 +2,6 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing; import com.rbkmoney.damsel.domain.FinalCashFlowPosting; import com.rbkmoney.damsel.domain.InvoicePaymentRefund; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.damsel.payment_processing.InvoicePaymentRefundCreated; import com.rbkmoney.geck.filter.Filter; @@ -12,6 +11,7 @@ import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.EventType; import com.rbkmoney.hooker.model.InvoicingMessage; import com.rbkmoney.hooker.model.Refund; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import org.springframework.stereotype.Component; import java.util.List; @@ -48,7 +48,7 @@ public class InvoicePaymentRefundStartedHandler extends NeedReadInvoiceEventHand } @Override - protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) { + protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) { InvoicePaymentRefundCreated refundCreated = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentRefundChange().getPayload().getInvoicePaymentRefundCreated(); Refund refund = new Refund(); message.setRefund(refund); diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStatusChangedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStatusChangedHandler.java index 01ecc55..18ac595 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStatusChangedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentRefundStatusChangedHandler.java @@ -2,7 +2,6 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing; import com.rbkmoney.damsel.domain.InvoicePaymentRefundStatus; import com.rbkmoney.damsel.domain.OperationFailure; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; @@ -12,6 +11,7 @@ import com.rbkmoney.hooker.model.EventType; import com.rbkmoney.hooker.model.InvoicingMessage; import com.rbkmoney.hooker.model.Refund; import com.rbkmoney.hooker.utils.ErrorUtils; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import org.springframework.stereotype.Component; @Component @@ -47,7 +47,7 @@ public class InvoicePaymentRefundStatusChangedHandler extends NeedReadInvoiceEve } @Override - protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) { + protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) { InvoicePaymentRefundStatus refundStatus = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentRefundChange().getPayload().getInvoicePaymentRefundStatusChanged().getStatus(); Refund refund = message.getRefund(); refund.setStatus(refundStatus.getSetField().getFieldName()); diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStartedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStartedHandler.java index 58db50d..a88a648 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStartedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStartedHandler.java @@ -4,7 +4,6 @@ import com.rbkmoney.damsel.domain.DisposablePaymentResource; import com.rbkmoney.damsel.domain.InvoicePayment; import com.rbkmoney.damsel.domain.PaymentTool; import com.rbkmoney.damsel.domain.RecurrentPayer; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; @@ -13,11 +12,10 @@ import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.model.*; import com.rbkmoney.hooker.model.Payment; import com.rbkmoney.hooker.utils.PaymentToolUtils; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import com.rbkmoney.swag_webhook_events.*; import org.springframework.stereotype.Component; -import static com.rbkmoney.hooker.utils.PaymentToolUtils.getPaymentToolDetails; - @Component public class InvoicePaymentStartedHandler extends NeedReadInvoiceEventHandler { @@ -44,7 +42,7 @@ public class InvoicePaymentStartedHandler extends NeedReadInvoiceEventHandler { } @Override - protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) { + protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) { InvoicePayment paymentOrigin = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentStarted().getPayment(); Payment payment = new Payment(); message.setPayment(payment); diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStatusChangedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStatusChangedHandler.java index 76146ae..eb0b494 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStatusChangedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoicePaymentStatusChangedHandler.java @@ -4,7 +4,6 @@ import com.rbkmoney.damsel.domain.Cash; import com.rbkmoney.damsel.domain.InvoicePaymentCaptured; import com.rbkmoney.damsel.domain.InvoicePaymentStatus; import com.rbkmoney.damsel.domain.OperationFailure; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; @@ -14,6 +13,7 @@ import com.rbkmoney.hooker.model.EventType; import com.rbkmoney.hooker.model.InvoicingMessage; import com.rbkmoney.hooker.model.Payment; import com.rbkmoney.hooker.utils.ErrorUtils; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import org.springframework.stereotype.Component; @Component @@ -47,7 +47,7 @@ public class InvoicePaymentStatusChangedHandler extends NeedReadInvoiceEventHand } @Override - protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) { + protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) { InvoicePaymentStatus paymentOriginStatus = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentStatusChanged().getStatus(); Payment payment = message.getPayment(); payment.setStatus(paymentOriginStatus.getSetField().getFieldName()); diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceStatusChangedHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceStatusChangedHandler.java index 5328e7a..4ecc562 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceStatusChangedHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/InvoiceStatusChangedHandler.java @@ -1,7 +1,6 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing; import com.rbkmoney.damsel.domain.InvoiceStatus; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.geck.filter.Filter; import com.rbkmoney.geck.filter.PathConditionFilter; @@ -10,6 +9,7 @@ import com.rbkmoney.geck.filter.rule.PathConditionRule; import com.rbkmoney.hooker.dao.InvoicingMessageDao; import com.rbkmoney.hooker.model.EventType; import com.rbkmoney.hooker.model.InvoicingMessage; +import com.rbkmoney.machinegun.eventsink.MachineEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -47,7 +47,7 @@ public class InvoiceStatusChangedHandler extends NeedReadInvoiceEventHandler { } @Override - protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) { + protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) { InvoiceStatus statusOrigin = ic.getInvoiceStatusChanged().getStatus(); message.getInvoice().setStatus(statusOrigin.getSetField().getFieldName()); if (statusOrigin.isSetCancelled()) { diff --git a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/NeedReadInvoiceEventHandler.java b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/NeedReadInvoiceEventHandler.java index 0048b03..dc86ffe 100644 --- a/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/NeedReadInvoiceEventHandler.java +++ b/src/main/java/com/rbkmoney/hooker/handler/poller/impl/invoicing/NeedReadInvoiceEventHandler.java @@ -1,6 +1,5 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing; -import com.rbkmoney.damsel.payment_processing.Event; import com.rbkmoney.damsel.payment_processing.InvoiceChange; import com.rbkmoney.hooker.dao.DaoException; import com.rbkmoney.hooker.dao.InvoicingMessageDao; @@ -16,20 +15,22 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa InvoicingMessageDao messageDao; @Override - protected void saveEvent(InvoiceChange ic, Event event) throws DaoException { - final String invoiceId = event.getSource().getInvoiceId(); + protected void saveEvent(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException { //getAny any saved message for related invoice - InvoicingMessage message = getMessage(invoiceId, ic); + InvoicingMessage message = getMessage(sourceId, ic); if (message == null) { - throw new DaoException("InvoicingMessage for invoice with id " + invoiceId + " not exist"); + throw new DaoException("InvoicingMessage for invoice with id " + sourceId + " not exist"); } message.setEventType(getEventType()); message.setType(getMessageType()); - message.setEventId(event.getId()); - message.setEventTime(event.getCreatedAt()); - modifyMessage(ic, event, message); - - messageDao.create(message); + message.setEventId(eventId); + message.setEventTime(eventCreatedAt); + message.setSequenceId(sequenceId); + message.setChangeId(changeId); + modifyMessage(ic, message); + if (!messageDao.updateIfExists(message)) { + messageDao.create(message); + } } protected abstract InvoicingMessage getMessage(String invoiceId, InvoiceChange ic); @@ -38,7 +39,7 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa protected abstract EventType getEventType(); - protected abstract void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message); + protected abstract void modifyMessage(InvoiceChange ic, InvoicingMessage message); } diff --git a/src/main/java/com/rbkmoney/hooker/listener/KafkaMachineEventListener.java b/src/main/java/com/rbkmoney/hooker/listener/KafkaMachineEventListener.java new file mode 100644 index 0000000..d7ddc10 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/listener/KafkaMachineEventListener.java @@ -0,0 +1,24 @@ +package com.rbkmoney.hooker.listener; + +import com.rbkmoney.machinegun.eventsink.SinkEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class KafkaMachineEventListener { + + private final MachineEventHandler machineEventHandler; + + @KafkaListener(topics = "${kafka.topics.invoicing}", containerFactory = "kafkaListenerContainerFactory") + public void listen(SinkEvent message, Acknowledgment ack) { + log.debug("Got machineEvent: {}", message); + machineEventHandler.handle(message.getEvent(), ack); + log.debug("Handled machineEvent {}", message); + } + +} diff --git a/src/main/java/com/rbkmoney/hooker/listener/MachineEventHandler.java b/src/main/java/com/rbkmoney/hooker/listener/MachineEventHandler.java new file mode 100644 index 0000000..49af6b6 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/listener/MachineEventHandler.java @@ -0,0 +1,10 @@ +package com.rbkmoney.hooker.listener; + +import com.rbkmoney.machinegun.eventsink.MachineEvent; +import org.springframework.kafka.support.Acknowledgment; + +public interface MachineEventHandler { + + void handle(MachineEvent message, Acknowledgment ack); + +} diff --git a/src/main/java/com/rbkmoney/hooker/listener/MachineEventHandlerImpl.java b/src/main/java/com/rbkmoney/hooker/listener/MachineEventHandlerImpl.java new file mode 100644 index 0000000..0b6eaa6 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/listener/MachineEventHandlerImpl.java @@ -0,0 +1,59 @@ +package com.rbkmoney.hooker.listener; + +import com.rbkmoney.damsel.payment_processing.CustomerChange; +import com.rbkmoney.damsel.payment_processing.EventPayload; +import com.rbkmoney.damsel.payment_processing.InvoiceChange; +import com.rbkmoney.hooker.converter.SourceEventParser; +import com.rbkmoney.hooker.service.HandlerManager; +import com.rbkmoney.machinegun.eventsink.MachineEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Component +@RequiredArgsConstructor +public class MachineEventHandlerImpl implements MachineEventHandler { + + private final HandlerManager handlerManager; + private final SourceEventParser eventParser; + + @Override + @Transactional + public void handle(MachineEvent machineEvent, Acknowledgment ack) { + EventPayload payload = eventParser.parseEvent(machineEvent); + log.info("EventPayload payload: {}", payload); + if (payload.isSetInvoiceChanges()) { + for (int i = 0; i < payload.getInvoiceChanges().size(); ++i) { + InvoiceChange invoiceChange = payload.getInvoiceChanges().get(i); + try { + int j = i; + handlerManager.getHandler(invoiceChange) + .ifPresentOrElse(handler -> handler.handle(invoiceChange, null, machineEvent.getCreatedAt(), machineEvent.getSourceId(), machineEvent.getEventId(), j), + () -> log.debug("Handler for invoiceChange {} wasn't found (machineEvent {})", invoiceChange, machineEvent)); + } catch (Exception ex) { + log.error("Failed to handle invoice change, invoiceChange='{}'", invoiceChange, ex); + throw ex; + } + } + } else if (payload.isSetCustomerChanges()) { + for (int i = 0; i < payload.getCustomerChanges().size(); ++i) { + CustomerChange customerChange = payload.getCustomerChanges().get(i); + try { + int j = i; + handlerManager.getHandler(customerChange) + .ifPresentOrElse(handler -> handler.handle(customerChange, null, machineEvent.getCreatedAt(), machineEvent.getSourceId(), machineEvent.getEventId(), j), + () -> log.debug("Handler for customerChange {} wasn't found (machineEvent {})", customerChange, machineEvent)); + } catch (Exception ex) { + log.error("Failed to handle customer change, customerChange='{}'", customerChange, ex); + throw ex; + } + } + } + ack.acknowledge(); + log.debug("Ack for machineEvent {} sent", machineEvent); + } + +} diff --git a/src/main/java/com/rbkmoney/hooker/model/CustomerMessage.java b/src/main/java/com/rbkmoney/hooker/model/CustomerMessage.java index d871f67..420e5b3 100644 --- a/src/main/java/com/rbkmoney/hooker/model/CustomerMessage.java +++ b/src/main/java/com/rbkmoney/hooker/model/CustomerMessage.java @@ -14,7 +14,9 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class CustomerMessage extends Message { - private long eventId; + private Long eventId; + private Long sequenceId; + private Integer changeId; private String type; private String occuredAt; private String partyId; diff --git a/src/main/java/com/rbkmoney/hooker/model/InvoicingMessage.java b/src/main/java/com/rbkmoney/hooker/model/InvoicingMessage.java index 348ee05..d87c67f 100644 --- a/src/main/java/com/rbkmoney/hooker/model/InvoicingMessage.java +++ b/src/main/java/com/rbkmoney/hooker/model/InvoicingMessage.java @@ -13,7 +13,9 @@ import lombok.Setter; @Getter @Setter public class InvoicingMessage extends Message { - private long eventId; + private Long eventId; + private Long sequenceId; + private Integer changeId; private String eventTime; private String type; private String partyId; @@ -25,6 +27,8 @@ public class InvoicingMessage extends Message { public InvoicingMessage(InvoicingMessage other) { setId(other.getId()); this.eventId = other.eventId; + this.sequenceId = other.sequenceId; + this.changeId = other.changeId; this.eventTime = other.eventTime; this.type = other.type; this.partyId = other.partyId; diff --git a/src/main/java/com/rbkmoney/hooker/serde/SinkEventDeserializer.java b/src/main/java/com/rbkmoney/hooker/serde/SinkEventDeserializer.java new file mode 100644 index 0000000..576935f --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/serde/SinkEventDeserializer.java @@ -0,0 +1,15 @@ +package com.rbkmoney.hooker.serde; + +import com.rbkmoney.kafka.common.deserializer.AbstractDeserializerAdapter; +import com.rbkmoney.machinegun.eventsink.SinkEvent; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SinkEventDeserializer extends AbstractDeserializerAdapter { + + @Override + public SinkEvent deserialize(String topic, byte[] data) { + return this.deserialize(data, new SinkEvent()); + } + +} \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/hooker/service/HandlerManager.java b/src/main/java/com/rbkmoney/hooker/service/HandlerManager.java new file mode 100644 index 0000000..7b7bba3 --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/service/HandlerManager.java @@ -0,0 +1,19 @@ +package com.rbkmoney.hooker.service; + +import com.rbkmoney.hooker.handler.Handler; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Optional; + +@Service +@RequiredArgsConstructor +public class HandlerManager { + + private final List handlers; + + public Optional getHandler(C change) { + return handlers.stream().filter(handler -> handler.accept(change)).findFirst(); + } +} \ No newline at end of file diff --git a/src/main/java/com/rbkmoney/hooker/service/SleepService.java b/src/main/java/com/rbkmoney/hooker/service/SleepService.java new file mode 100644 index 0000000..e54bcef --- /dev/null +++ b/src/main/java/com/rbkmoney/hooker/service/SleepService.java @@ -0,0 +1,20 @@ +package com.rbkmoney.hooker.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +public class SleepService { + + public void safeSleep(Long timeout) { + try { + TimeUnit.MILLISECONDS.sleep(timeout); + } catch (InterruptedException e) { + log.error("InterruptedException when waite timeout ", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 5c25c18..0c8b67f 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -5,6 +5,28 @@ spring: name: @name@ description: @description@ flyway.schemas: hook + +kafka: + bootstrap-servers: kenny-kafka1.bst1.rbkmoney.net:9092 + client-id: newway + ssl: + enabled: false + trust-store-location: "test" + trust-store-password: "test" + key-store-location: "test" + key-store-password: "test" + key-password: "test" + key-store-type: PKCS12 + trust-store-type: PKCS12 + consumer: + concurrency: 8 + group-id: "HookerListener" + enable-auto-commit: false + auto-offset-reset: earliest + max-poll-records: 20 + topics: + invoicing: mg-invoice-100-2 + info: version: @version@ responsible: Inal Arsanukaev @@ -47,4 +69,7 @@ management: export: statsd: enabled: false - flavor: etsy \ No newline at end of file + flavor: etsy + +retry-policy: + maxAttempts: -1 diff --git a/src/main/resources/cert/truststore.p12 b/src/main/resources/cert/truststore.p12 new file mode 100644 index 0000000000000000000000000000000000000000..399486ce42f5ea8756ed360f28ad43482a0ad0da GIT binary patch literal 1114 zcmV-g1f}~hf&^9q0Ru3C1P=xYDuzgg_YDCD0ic2e00e>q{4jz8_%MP4^acqkhDe6@ z4FLxRpn?PCFoFZ&0s#Opf&UZ}aAuT!9ofxYbcqDDh&(NK1PS7= zps`DASP4jxIgHx+g6aS8waEKi8Z$Fh5j{k;krYmaMjW6n6*L}2Q4Acq5KyXmTHwLq9LVBudos%3AlDaPe{tNGq2T`Kg0{G+UymFKsyVd0QJHk#iMx585;i zY@43pzyhK?7B2reM|J<5ea%ALk879RKW%{!LL-};2KY!9DwuA}qx7pgQ?=L{$f_5^ zO9x`jY3#qswG-qF_#i{89AvvE!|<{e*Egkxsu6UE!!o%B+XqjHCEP;D?>*icfSt~8 zjnKHief)4mABkr@!z(Mig>F!_ghJQY`A#2dwHj(Nc?e{C_#U!TIfQANlHb|`q-dEy z7&HJrXNQ!**Q3hi(&H~j)7pVZ1~{6b>4MG!XJkl(pBVJq7B&wCghf(Xiervp@VX1B z#4~x$m;}9_-gCI&e__(XkP?c+P^QJoil1L3Nk+%b29N`#M1D`A++XB{qqeXfQ1g~O zocq-4fgXWy=)r9k`~y2|GUBjWZPlgyb#Z>z+SCt@`2x3;*D?gTT#Ry@9}{~5=W9@Q zuDQC{5T_%!7dfH+s4OQngICeGa$*P63@?M!8t@f4|H>&B0GK4mtPavF={ChaYpz4< zWU1_J{=e4|+}LZZ?!;J?u8gr2fkQ1`oL&!Qrx()C&XQBzCvWB;r0E^LP7H*u) z1S{$a^xSmZXy5^92gqz{Mle1wAutIB1uG5%0vZJX1QZ1%y~$^Y2MEb(-H6>#tL=Ij g@$Li^pjYQg2+|9QcukyPta* producer = createProducer(); + ProducerRecord producerRecord = new ProducerRecord<>(topic, null, sinkEvent); + try { + producer.send(producerRecord).get(); + } catch (Exception e) { + log.error("KafkaAbstractTest initialize e: ", e); + } + producer.close(); + } + + private void waitForTopicSync() throws InterruptedException { + Thread.sleep(1000L); + } + + + private MachineEvent createMessage() { + MachineEvent message = new MachineEvent(); + Value data = new Value(); + data.setBin(new byte[0]); + message.setCreatedAt(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)); + message.setEventId(1L); + message.setSourceNs(SOURCE_NS); + message.setSourceId(SOURCE_ID); + message.setData(data); + return message; + } + + public static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "client_id"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ThriftSerializer().getClass()); + return new KafkaProducer<>(props); + } +} diff --git a/src/test/java/com/rbkmoney/hooker/listener/MachineEventHandlerImplTest.java b/src/test/java/com/rbkmoney/hooker/listener/MachineEventHandlerImplTest.java new file mode 100644 index 0000000..cbc0f45 --- /dev/null +++ b/src/test/java/com/rbkmoney/hooker/listener/MachineEventHandlerImplTest.java @@ -0,0 +1,87 @@ +package com.rbkmoney.hooker.listener; + +import com.rbkmoney.damsel.payment_processing.Event; +import com.rbkmoney.damsel.payment_processing.EventPayload; +import com.rbkmoney.damsel.payment_processing.InvoiceChange; +import com.rbkmoney.hooker.converter.SourceEventParser; +import com.rbkmoney.hooker.exception.ParseException; +import com.rbkmoney.hooker.handler.Handler; +import com.rbkmoney.hooker.service.HandlerManager; +import com.rbkmoney.machinegun.eventsink.MachineEvent; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.kafka.support.Acknowledgment; + +import java.util.ArrayList; + +import static org.mockito.ArgumentMatchers.any; + +public class MachineEventHandlerImplTest { + + @Mock + private HandlerManager handlerManager; + @Mock + private Handler handler; + @Mock + private SourceEventParser eventParser; + @Mock + private Acknowledgment ack; + + private MachineEventHandlerImpl machineEventHandler; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + machineEventHandler = new MachineEventHandlerImpl(handlerManager, eventParser); + } + + @Test + public void listenEmptyChanges() { + Mockito.when(handlerManager.getHandler(any())).thenReturn(java.util.Optional.of(handler)); + + MachineEvent message = new MachineEvent(); + Event event = new Event(); + EventPayload payload = new EventPayload(); + payload.setInvoiceChanges(new ArrayList<>()); + event.setPayload(payload); + Mockito.when(eventParser.parseEvent(message)).thenReturn(payload); + + machineEventHandler.handle(message, ack); + + Mockito.verify(handlerManager, Mockito.times(0)).getHandler(any()); + Mockito.verify(handler, Mockito.times(0)).handle(any(), any(), any(), any(), any(), any()); + Mockito.verify(ack, Mockito.times(1)).acknowledge(); + } + + @Test(expected = ParseException.class) + public void listenEmptyException() { + MachineEvent message = new MachineEvent(); + Mockito.when(eventParser.parseEvent(message)).thenThrow(new ParseException()); + machineEventHandler.handle(message, ack); + + Mockito.verify(ack, Mockito.times(0)).acknowledge(); + } + + @Test + public void listenChanges() { + MachineEvent message = new MachineEvent(); + Event event = new Event(); + EventPayload payload = new EventPayload(); + ArrayList invoiceChanges = new ArrayList<>(); + invoiceChanges.add(new InvoiceChange()); + payload.setInvoiceChanges(invoiceChanges); + event.setPayload(payload); + Mockito.when(eventParser.parseEvent(message)).thenReturn(payload); + Mockito.when(handlerManager.getHandler(any())).thenReturn(java.util.Optional.of(handler)); + + machineEventHandler.handle(message, ack); + + Mockito.verify(handlerManager, Mockito.times(1)).getHandler(any()); + Mockito.verify(handler, Mockito.times(1)).handle(any(), any(), any(), any(), any(), any()); + Mockito.verify(ack, Mockito.times(1)).acknowledge(); + } + +} diff --git a/src/test/java/com/rbkmoney/hooker/utils/BuildUtils.java b/src/test/java/com/rbkmoney/hooker/utils/BuildUtils.java index 8067ffb..daf1185 100644 --- a/src/test/java/com/rbkmoney/hooker/utils/BuildUtils.java +++ b/src/test/java/com/rbkmoney/hooker/utils/BuildUtils.java @@ -20,7 +20,7 @@ public class BuildUtils { public static InvoicingMessage buildMessage(String type, String invoiceId, String partyId, EventType eventType, String status, List cart, boolean isPayer) { InvoicingMessage message = new InvoicingMessage(); - message.setEventId(5555); + message.setEventId(5555L); message.setEventTime("time"); message.setType(type); message.setPartyId(partyId);