Kaфka consumer (#73)

* commit for CI

* updated libs, refactored for lombok

* CI fix

* added build_utils

* added port

* added sonar coverage, fixed sonar bugs, refactored error-extraction

* fixed error conversion logic

* Refactored for Kafka

* tests fixes

* added Kafka tests

* fix review

* fix review

* fix review

* fix review

* fix review

* fix review

* concurency = 5

* Added check duplicates

* Added constraint columns

* Fixed pom

* Kafka application properties config (#74)

* fix review

* test repaired

* test repaired

* kafka-common-lib added

* review fix

* Added update of old records

* Updated kafka stuff

* Fixed condition for updateIfExists
This commit is contained in:
Pospolita Nikita 2019-06-13 19:22:14 +03:00 committed by Inal Arsanukaev
parent 5548c3a754
commit c5208c5a23
48 changed files with 800 additions and 97 deletions

@ -1 +1 @@
Subproject commit 870b70a63af18fc7a02c9ff26b06132d2b1993cb
Subproject commit ea4aa042f482551d624fd49a570d28488f479e93

40
pom.xml
View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hooker</artifactId>
<version>2.0.25-SNAPSHOT</version>
<version>2.0.26-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hooker</name>
@ -36,6 +36,8 @@
<db.user>postgres</db.user>
<db.password>postgres</db.password>
<db.schema>hook</db.schema>
<geck.version>0.6.9</geck.version>
</properties>
<dependencies>
@ -92,10 +94,25 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rbkmoney.woody</groupId>
<artifactId>woody-thrift</artifactId>
<version>1.1.15</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.geck</groupId>
<artifactId>common</artifactId>
<version>${geck.version}</version>
</dependency>
<dependency>
<groupId>com.rbkmoney.geck</groupId>
<artifactId>filter</artifactId>
<version>${geck.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.4.RELEASE</version>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@ -135,6 +152,11 @@
<artifactId>eventstock-client</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>kafka-common-lib</artifactId>
<version>0.0.5</version>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>damsel</artifactId>
@ -213,6 +235,11 @@
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rbkmoney</groupId>
<artifactId>machinegun-proto</artifactId>
<version>1.12-ebae56f</version>
</dependency>
</dependencies>
<build>
<resources>
@ -245,6 +272,15 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>p12</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-remote-resources-plugin</artifactId>

View File

@ -49,12 +49,12 @@ public class EventStockPollerConfig {
List<EventPublisher> eventPublishers = new ArrayList<>();
for (int i = 0; i < workersCount; ++i) {
eventPublishers.add(new PollingEventPublisherBuilder()
.withURI(bmUri.getURI())
.withEventHandler(eventStockHandlers.get(i))
.withMaxPoolSize(maxPoolSize)
.withPollDelay(pollDelay)
.withMaxQuerySize(maxQuerySize)
.build());
.withURI(bmUri.getURI())
.withEventHandler(eventStockHandlers.get(i))
.withMaxPoolSize(maxPoolSize)
.withPollDelay(pollDelay)
.withMaxQuerySize(maxQuerySize)
.build());
}
return eventPublishers;
}

View File

@ -0,0 +1,131 @@
package com.rbkmoney.hooker.configuration;
import com.rbkmoney.kafka.common.retry.ConfigurableRetryPolicy;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.hooker.configuration.properties.KafkaSslProperties;
import com.rbkmoney.hooker.serde.SinkEventDeserializer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableConfigurationProperties(KafkaSslProperties.class)
@RequiredArgsConstructor
public class KafkaConfig {
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.client-id}")
private String clientId;
@Value("${kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
private final KafkaSslProperties kafkaSslProperties;
@Value("${retry-policy.maxAttempts}")
int maxAttempts;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SinkEventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
configureSsl(props);
return props;
}
private void configureSsl(Map<String, Object> props) {
if (kafkaSslProperties.isEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
}
}
@Bean
public ConsumerFactory<String, MachineEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> kafkaListenerContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory,
RetryTemplate kafkaRetryTemplate
) {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setErrorHandler(kafkaErrorHandler());
factory.setConcurrency(concurrency);
factory.setRetryTemplate(kafkaRetryTemplate);
return factory;
}
private ErrorHandler kafkaErrorHandler() {
return (thrownException, data) -> {
if (data != null) {
log.error("Error while processing: data-key: {}, data-offset: {}, data-partition: {}",
data.key(), data.offset(), data.partition(), thrownException);
} else {
log.error("Error while processing", thrownException);
}
};
}
@Bean
public RetryTemplate kafkaRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(
new ConfigurableRetryPolicy(maxAttempts, Collections.singletonMap(RuntimeException.class, true))
);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
return retryTemplate;
}
}

View File

@ -0,0 +1,29 @@
package com.rbkmoney.hooker.configuration;
import com.rbkmoney.kafka.common.retry.ConfigurableRetryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.Collections;
@Configuration
public class RetryConfig {
@Value("${retry-policy.maxAttempts}")
int maxAttempts;
@Bean
RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(
new ConfigurableRetryPolicy(maxAttempts, Collections.singletonMap(RuntimeException.class, true))
);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
return retryTemplate;
}
}

View File

@ -0,0 +1,23 @@
package com.rbkmoney.hooker.configuration.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "kafka.ssl")
public class KafkaSslProperties {
private String trustStorePassword;
private String trustStoreLocation;
private String keyStorePassword;
private String keyPassword;
private String keyStoreLocation;
private boolean enabled;
private String keyStoreType;
private String trustStoreType;
}

View File

@ -0,0 +1,28 @@
package com.rbkmoney.hooker.converter;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.hooker.exception.ParseException;
import com.rbkmoney.kafka.common.converter.BinaryConverter;
import com.rbkmoney.kafka.common.converter.BinaryConverterImpl;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class SourceEventParser {
private final BinaryConverter<EventPayload> converter = new BinaryConverterImpl();
public EventPayload parseEvent(MachineEvent message) {
try {
byte[] bin = message.getData().getBin();
return converter.convert(bin, EventPayload.class);
} catch (Exception e) {
log.error("Exception when parse message e: ", e);
throw new ParseException();
}
}
}

View File

@ -6,4 +6,5 @@ public interface InvoicingMessageDao extends MessageDao<InvoicingMessage> {
InvoicingMessage getInvoice(String invoiceId) throws DaoException;
InvoicingMessage getPayment(String invoiceId, String paymentId) throws DaoException;
InvoicingMessage getRefund(String invoiceId, String paymentId, String refundId) throws DaoException;
boolean updateIfExists(InvoicingMessage message);
}

View File

@ -40,6 +40,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
public static final String EVENT_ID = "event_id";
public static final String TYPE = "type";
public static final String OCCURED_AT = "occured_at";
public static final String SEQUENCE_ID = "sequence_id";
public static final String CHANGE_ID = "change_id";
public static final String PARTY_ID = "party_id";
public static final String EVENT_TYPE = "event_type";
public static final String CUSTOMER_ID = "customer_id";
@ -99,6 +101,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
message.setEventId(rs.getLong(EVENT_ID));
message.setPartyId(rs.getString(PARTY_ID));
message.setOccuredAt(rs.getString(OCCURED_AT));
message.setSequenceId(rs.getLong(SEQUENCE_ID));
message.setChangeId(rs.getInt(CHANGE_ID));
message.setType(rs.getString(TYPE));
message.setEventType(EventType.valueOf(rs.getString(EVENT_TYPE)));
message.setCustomer(new Customer()
@ -151,14 +155,14 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
@Transactional
public void create(CustomerMessage message) throws DaoException {
final String sql = "INSERT INTO hook.customer_message " +
"(event_id, occured_at, type, party_id, event_type, " +
"(event_id, occured_at, sequence_id, change_id, type, party_id, event_type, " +
"customer_id, customer_shop_id, customer_status, customer_email , customer_phone, customer_metadata, " +
"binding_id, binding_payment_tool_token, binding_payment_session, binding_payment_tool_details_type, " +
"binding_payment_card_bin, binding_payment_card_last_digits, binding_payment_card_number_mask, binding_payment_card_token_provider, binding_payment_card_system, binding_payment_terminal_provider, " +
"binding_payment_digital_wallet_provider, binding_payment_digital_wallet_id, binding_payment_crypto_currency, " +
"binding_client_ip, binding_client_fingerprint, binding_status, binding_error_code, binding_error_message) " +
"VALUES " +
"(:event_id, :occured_at, CAST(:type as hook.customer_message_type), :party_id, CAST(:event_type as hook.eventtype), " +
"(:event_id, :occured_at, :sequence_id, :change_id, CAST(:type as hook.customer_message_type), :party_id, CAST(:event_type as hook.eventtype), " +
":customer_id, :customer_shop_id, CAST(:customer_status as hook.customer_status), :customer_email , :customer_phone, :customer_metadata, " +
":binding_id, :binding_payment_tool_token, :binding_payment_session, CAST(:binding_payment_tool_details_type as hook.payment_tool_details_type), " +
":binding_payment_card_bin, :binding_payment_card_last_digits, :binding_payment_card_number_mask, :binding_payment_card_token_provider, :binding_payment_card_system, :binding_payment_terminal_provider, " +
@ -169,6 +173,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue(EVENT_ID, message.getEventId())
.addValue(OCCURED_AT, message.getOccuredAt())
.addValue(SEQUENCE_ID, message.getSequenceId())
.addValue(CHANGE_ID, message.getChangeId())
.addValue(TYPE, message.getType())
.addValue(PARTY_ID, message.getPartyId())
.addValue(EVENT_TYPE, message.getEventType().name())

View File

@ -42,6 +42,8 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
public static final String ID = "id";
public static final String EVENT_ID = "event_id";
public static final String EVENT_TIME = "event_time";
public static final String SEQUENCE_ID = "sequence_id";
public static final String CHANGE_ID = "change_id";
public static final String TYPE = "type";
public static final String PARTY_ID = "party_id";
public static final String EVENT_TYPE = "event_type";
@ -154,6 +156,8 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
message.setId(rs.getLong(ID));
message.setEventId(rs.getLong(EVENT_ID));
message.setEventTime(rs.getString(EVENT_TIME));
message.setSequenceId(rs.getLong(SEQUENCE_ID));
message.setChangeId(rs.getInt(CHANGE_ID));
message.setType(rs.getString(TYPE));
message.setPartyId(rs.getString(PARTY_ID));
message.setEventType(EventType.valueOf(rs.getString(EVENT_TYPE)));
@ -306,7 +310,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
@Transactional
public void create(InvoicingMessage message) throws DaoException {
final String sql = "INSERT INTO hook.message" +
"(event_id, event_time, type, party_id, event_type, " +
"(event_id, event_time, sequence_id, change_id, type, party_id, event_type, " +
"invoice_id, shop_id, invoice_created_at, invoice_status, invoice_reason, invoice_due_date, invoice_amount, " +
"invoice_currency, invoice_content_type, invoice_content_data, invoice_product, invoice_description, " +
"payment_id, payment_created_at, payment_status, payment_failure, payment_failure_reason, payment_amount, " +
@ -315,7 +319,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
"payment_digital_wallet_provider, payment_digital_wallet_id, payment_crypto_currency, " +
"refund_id, refund_created_at, refund_status, refund_failure, refund_failure_reason, refund_amount, refund_currency, refund_reason) " +
"VALUES " +
"(:event_id, :event_time, :type, :party_id, CAST(:event_type as hook.eventtype), " +
"(:event_id, :event_time, :sequence_id, :change_id, :type, :party_id, CAST(:event_type as hook.eventtype), " +
":invoice_id, :shop_id, :invoice_created_at, :invoice_status, :invoice_reason, :invoice_due_date, :invoice_amount, " +
":invoice_currency, :invoice_content_type, :invoice_content_data, :invoice_product, :invoice_description, " +
":payment_id, :payment_created_at, :payment_status, :payment_failure, :payment_failure_reason, :payment_amount, " +
@ -327,6 +331,8 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue(EVENT_ID, message.getEventId())
.addValue(EVENT_TIME, message.getEventTime())
.addValue(SEQUENCE_ID, message.getSequenceId())
.addValue(CHANGE_ID, message.getChangeId())
.addValue(TYPE, message.getType())
.addValue(PARTY_ID, message.getPartyId())
.addValue(EVENT_TYPE, message.getEventType().toString())
@ -451,4 +457,35 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
public InvoicingMessage getRefund(String invoiceId, String paymentId, String refundId) throws DaoException {
return getAny(invoiceId, paymentId, refundId, REFUND);
}
@Override
public boolean updateIfExists(InvoicingMessage message) {
String sql = "WITH sub AS (SELECT id FROM hook.message WHERE invoice_id=:invoice_id" +
" AND type=:type" +
" AND event_type=CAST(:event_type as hook.eventtype)" +
" AND invoice_status=:invoice_status" +
" AND (payment_id IS NULL OR payment_id=:payment_id)" +
" AND (payment_status IS NULL OR payment_status=:payment_status)" +
" AND (refund_id IS NULL OR refund_id=:refund_id)" +
" AND (refund_status IS NULL OR refund_status=:refund_status) LIMIT 1) " +
" UPDATE hook.message m SET sequence_id =:sequence_id, change_id =:change_id " +
" FROM sub " +
" WHERE m.id = sub.id";
MapSqlParameterSource params = new MapSqlParameterSource(INVOICE_ID, message.getInvoice().getId())
.addValue(TYPE, message.getType())
.addValue(EVENT_TYPE, message.getEventType().toString())
.addValue(INVOICE_STATUS, message.getInvoice().getStatus())
.addValue(PAYMENT_ID, message.getPayment() != null ? message.getPayment().getId() : null)
.addValue(PAYMENT_STATUS, message.getPayment() != null ? message.getPayment().getStatus() : null)
.addValue(REFUND_ID, message.getRefund() != null ? message.getRefund().getId() : null)
.addValue(REFUND_STATUS, message.getRefund() != null ? message.getRefund().getStatus() : null)
.addValue(SEQUENCE_ID, message.getSequenceId())
.addValue(CHANGE_ID, message.getChangeId());
try {
int count = getNamedParameterJdbcTemplate().update(sql, params);
return count > 0;
} catch (NestedRuntimeException e) {
throw new DaoException("InvoicingMessageDaoImpl.updateIfExists error", e);
}
}
}

View File

@ -7,7 +7,6 @@ import com.rbkmoney.woody.thrift.impl.http.event.HttpServiceEventLogListener;
import com.rbkmoney.woody.thrift.impl.http.event.ServiceEventLogListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import java.io.IOException;

View File

@ -0,0 +1,23 @@
package com.rbkmoney.hooker.exception;
public class ParseException extends RuntimeException {
public ParseException() {
}
public ParseException(String message) {
super(message);
}
public ParseException(String message, Throwable cause) {
super(message, cause);
}
public ParseException(Throwable cause) {
super(cause);
}
public ParseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -5,10 +5,10 @@ import com.rbkmoney.geck.filter.Filter;
/**
* Created by inal on 24.11.2016.
*/
public interface Handler<C, P> {
public interface Handler<C> {
default boolean accept(C change) {
return getFilter().match(change);
}
void handle(C change, P parent);
void handle(C change, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId);
Filter getFilter();
}

View File

@ -59,12 +59,13 @@ public class EventStockHandler implements EventHandler<StockEvent> {
long id = processingEvent.getId();
for (Object cc : changes) {
for (int i = 0; i < changes.size(); ++i) {
Object cc = changes.get(i);
for (Handler pollingEventHandler : pollingEventHandlers) {
if (pollingEventHandler.accept(cc)) {
try {
log.info("We got an event {}", new TBaseProcessor().process(stockEvent, JsonHandler.newPrettyJsonInstance()));
pollingEventHandler.handle(cc, stockEvent);
pollingEventHandler.handle(cc, stockEvent.getId(), stockEvent.getTime(), sourceId, (long) processingEvent.getSequence(), i);
} catch (DaoException e) {
log.error("DaoException when poller handling with eventId {}", id, e);
if (count.decrementAndGet() > 0) {

View File

@ -1,24 +1,22 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.event_stock.StockEvent;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.handler.Handler;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
/**
* Created by inalarsanukaev on 07.04.17.
*/
public abstract class AbstractCustomerEventHandler implements Handler<CustomerChange, StockEvent> {
public abstract class AbstractCustomerEventHandler implements Handler<CustomerChange> {
public static final String CUSTOMER = "customer";
public static final String BINDING = "binding";
@Override
public void handle(CustomerChange c, StockEvent value) throws DaoException{
Event event = value.getSourceEvent().getProcessingEvent();
saveEvent(c, event);
public void handle(CustomerChange c, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException{
saveEvent(c, eventId, eventCreatedAt, sourceId, sequenceId, changeId);
}
protected abstract void saveEvent(CustomerChange cc, Event event) throws DaoException;
protected abstract void saveEvent(CustomerChange cc, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException;
}

View File

@ -3,13 +3,13 @@ package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.domain.Failure;
import com.rbkmoney.damsel.domain.OperationFailure;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.swag_webhook_events.CustomerBindingError;
import org.springframework.stereotype.Component;
@ -41,7 +41,7 @@ public class CustomerBindingFailedHandler extends NeedReadCustomerEventHandler {
}
@Override
protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) {
protected void modifyMessage(CustomerChange cc, CustomerMessage message) {
OperationFailure failure = cc.getCustomerBindingChanged().getPayload().getStatusChanged().getStatus().getFailed().getFailure();
String errCode = null;
String errMess = null;

View File

@ -1,13 +1,13 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.swag_webhook_events.ClientInfo;
import com.rbkmoney.swag_webhook_events.CustomerBinding;
import com.rbkmoney.swag_webhook_events.PaymentResource;
@ -48,7 +48,7 @@ public class CustomerBindingStartedHandler extends NeedReadCustomerEventHandler
}
@Override
protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) {
protected void modifyMessage(CustomerChange cc, CustomerMessage message) {
com.rbkmoney.damsel.payment_processing.CustomerBinding bindingOrigin = cc.getCustomerBindingChanged().getPayload().getStarted().getBinding();
PaymentResource paymentResource = new PaymentResource()
.paymentSession(bindingOrigin.getPaymentResource().getPaymentSessionId())

View File

@ -1,13 +1,13 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.swag_webhook_events.CustomerBinding;
import org.springframework.stereotype.Component;
@ -39,7 +39,7 @@ public class CustomerBindingSucceededHandler extends NeedReadCustomerEventHandle
}
@Override
protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) {
protected void modifyMessage(CustomerChange cc, CustomerMessage message) {
message.getCustomerBinding().setStatus(CustomerBinding.StatusEnum.SUCCEEDED);
}
}

View File

@ -1,7 +1,6 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
@ -39,11 +38,13 @@ public class CustomerCreatedHandler extends AbstractCustomerEventHandler {
}
@Override
protected void saveEvent(CustomerChange cc, Event event) throws DaoException {
protected void saveEvent(CustomerChange cc, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException {
com.rbkmoney.damsel.payment_processing.CustomerCreated customerCreatedOrigin = cc.getCustomerCreated();
CustomerMessage customerMessage = new CustomerMessage();
customerMessage.setEventId(event.getId());
customerMessage.setOccuredAt(event.getCreatedAt());
customerMessage.setEventId(eventId);
customerMessage.setOccuredAt(eventCreatedAt);
customerMessage.setSequenceId(sequenceId);
customerMessage.setChangeId(changeId);
customerMessage.setType(CUSTOMER);
customerMessage.setPartyId(customerCreatedOrigin.getOwnerId());
customerMessage.setEventType(eventType);

View File

@ -1,13 +1,14 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.swag_webhook_events.Customer;
import org.springframework.stereotype.Component;
/**
@ -38,7 +39,6 @@ public class CustomerDeletedHandler extends NeedReadCustomerEventHandler {
}
@Override
protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) {
protected void modifyMessage(CustomerChange cc, CustomerMessage message) {
}
}

View File

@ -1,13 +1,13 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.swag_webhook_events.Customer;
import org.springframework.stereotype.Component;
@ -39,7 +39,7 @@ public class CustomerReadyHandler extends NeedReadCustomerEventHandler {
}
@Override
protected void modifyMessage(CustomerChange cc, Event event, CustomerMessage message) {
protected void modifyMessage(CustomerChange cc, CustomerMessage message) {
message.getCustomer().setStatus(Customer.StatusEnum.READY);
}
}

View File

@ -1,11 +1,11 @@
package com.rbkmoney.hooker.handler.poller.impl.customer;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.springframework.beans.factory.annotation.Autowired;
/**
@ -17,18 +17,19 @@ public abstract class NeedReadCustomerEventHandler extends AbstractCustomerEvent
CustomerDao customerDao;
@Override
protected void saveEvent(CustomerChange cc, Event event) throws DaoException {
final String customerId = event.getSource().getCustomerId();
protected void saveEvent(CustomerChange cc, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException {
//getAny any saved message for related invoice
CustomerMessage message = getCustomerMessage(customerId);
CustomerMessage message = getCustomerMessage(sourceId);
if (message == null) {
throw new DaoException("CustomerMessage for customer with id " + customerId + " not exist");
throw new DaoException("CustomerMessage for customer with id " + sourceId + " not exist");
}
message.setEventType(getEventType());
message.setType(getMessageType());
message.setEventId(event.getId());
message.setOccuredAt(event.getCreatedAt());
modifyMessage(cc, event, message);
message.setEventId(eventId);
message.setOccuredAt(eventCreatedAt);
message.setSequenceId(sequenceId);
message.setChangeId(changeId);
modifyMessage(cc, message);
customerDao.create(message);
}
@ -41,5 +42,5 @@ public abstract class NeedReadCustomerEventHandler extends AbstractCustomerEvent
protected abstract EventType getEventType();
protected abstract void modifyMessage(CustomerChange cc, Event event, CustomerMessage message);
protected abstract void modifyMessage(CustomerChange cc, CustomerMessage message);
}

View File

@ -1,7 +1,5 @@
package com.rbkmoney.hooker.handler.poller.impl.invoicing;
import com.rbkmoney.damsel.event_stock.StockEvent;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.handler.Handler;
@ -9,17 +7,16 @@ import com.rbkmoney.hooker.handler.Handler;
/**
* Created by inalarsanukaev on 07.04.17.
*/
public abstract class AbstractInvoiceEventHandler implements Handler<InvoiceChange, StockEvent> {
public abstract class AbstractInvoiceEventHandler implements Handler<InvoiceChange> {
public static final String INVOICE = "invoice";
public static final String PAYMENT = "payment";
public static final String REFUND = "refund";
@Override
public void handle(InvoiceChange ic, StockEvent value) throws DaoException{
Event event = value.getSourceEvent().getProcessingEvent();
saveEvent(ic, event);
public void handle(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException {
saveEvent(ic, eventId, eventCreatedAt, sourceId, sequenceId, changeId);
}
protected abstract void saveEvent(InvoiceChange ic, Event event) throws DaoException;
protected abstract void saveEvent(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException;
}

View File

@ -4,7 +4,6 @@ import com.rbkmoney.damsel.domain.Invoice;
import com.rbkmoney.damsel.domain.InvoiceCart;
import com.rbkmoney.damsel.domain.InvoiceLine;
import com.rbkmoney.damsel.msgpack.Value;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
@ -35,12 +34,14 @@ public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler {
@Override
@Transactional
public void saveEvent(InvoiceChange ic, Event event) throws DaoException {
public void saveEvent(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException {
Invoice invoiceOrigin = ic.getInvoiceCreated().getInvoice();
//////
InvoicingMessage message = new InvoicingMessage();
message.setEventId(event.getId());
message.setEventTime(event.getCreatedAt());
message.setEventId(eventId);
message.setEventTime(eventCreatedAt);
message.setSequenceId(sequenceId);
message.setChangeId(changeId);
message.setType(INVOICE);
message.setPartyId(invoiceOrigin.getOwnerId());
message.setEventType(eventType);
@ -77,7 +78,9 @@ public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler {
invoice.getCart().add(icp);
}
}
messageDao.create(message);
if (!messageDao.updateIfExists(message)) {
messageDao.create(message);
}
}
@Override

View File

@ -2,7 +2,6 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing;
import com.rbkmoney.damsel.domain.FinalCashFlowPosting;
import com.rbkmoney.damsel.domain.InvoicePaymentRefund;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.damsel.payment_processing.InvoicePaymentRefundCreated;
import com.rbkmoney.geck.filter.Filter;
@ -12,6 +11,7 @@ import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.Refund;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.springframework.stereotype.Component;
import java.util.List;
@ -48,7 +48,7 @@ public class InvoicePaymentRefundStartedHandler extends NeedReadInvoiceEventHand
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) {
InvoicePaymentRefundCreated refundCreated = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentRefundChange().getPayload().getInvoicePaymentRefundCreated();
Refund refund = new Refund();
message.setRefund(refund);

View File

@ -2,7 +2,6 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing;
import com.rbkmoney.damsel.domain.InvoicePaymentRefundStatus;
import com.rbkmoney.damsel.domain.OperationFailure;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
@ -12,6 +11,7 @@ import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.Refund;
import com.rbkmoney.hooker.utils.ErrorUtils;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.springframework.stereotype.Component;
@Component
@ -47,7 +47,7 @@ public class InvoicePaymentRefundStatusChangedHandler extends NeedReadInvoiceEve
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) {
InvoicePaymentRefundStatus refundStatus = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentRefundChange().getPayload().getInvoicePaymentRefundStatusChanged().getStatus();
Refund refund = message.getRefund();
refund.setStatus(refundStatus.getSetField().getFieldName());

View File

@ -4,7 +4,6 @@ import com.rbkmoney.damsel.domain.DisposablePaymentResource;
import com.rbkmoney.damsel.domain.InvoicePayment;
import com.rbkmoney.damsel.domain.PaymentTool;
import com.rbkmoney.damsel.domain.RecurrentPayer;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
@ -13,11 +12,10 @@ import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.*;
import com.rbkmoney.hooker.model.Payment;
import com.rbkmoney.hooker.utils.PaymentToolUtils;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.swag_webhook_events.*;
import org.springframework.stereotype.Component;
import static com.rbkmoney.hooker.utils.PaymentToolUtils.getPaymentToolDetails;
@Component
public class InvoicePaymentStartedHandler extends NeedReadInvoiceEventHandler {
@ -44,7 +42,7 @@ public class InvoicePaymentStartedHandler extends NeedReadInvoiceEventHandler {
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) {
InvoicePayment paymentOrigin = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentStarted().getPayment();
Payment payment = new Payment();
message.setPayment(payment);

View File

@ -4,7 +4,6 @@ import com.rbkmoney.damsel.domain.Cash;
import com.rbkmoney.damsel.domain.InvoicePaymentCaptured;
import com.rbkmoney.damsel.domain.InvoicePaymentStatus;
import com.rbkmoney.damsel.domain.OperationFailure;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
@ -14,6 +13,7 @@ import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.Payment;
import com.rbkmoney.hooker.utils.ErrorUtils;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.springframework.stereotype.Component;
@Component
@ -47,7 +47,7 @@ public class InvoicePaymentStatusChangedHandler extends NeedReadInvoiceEventHand
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) {
InvoicePaymentStatus paymentOriginStatus = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentStatusChanged().getStatus();
Payment payment = message.getPayment();
payment.setStatus(paymentOriginStatus.getSetField().getFieldName());

View File

@ -1,7 +1,6 @@
package com.rbkmoney.hooker.handler.poller.impl.invoicing;
import com.rbkmoney.damsel.domain.InvoiceStatus;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
@ -10,6 +9,7 @@ import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -47,7 +47,7 @@ public class InvoiceStatusChangedHandler extends NeedReadInvoiceEventHandler {
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
protected void modifyMessage(InvoiceChange ic, InvoicingMessage message) {
InvoiceStatus statusOrigin = ic.getInvoiceStatusChanged().getStatus();
message.getInvoice().setStatus(statusOrigin.getSetField().getFieldName());
if (statusOrigin.isSetCancelled()) {

View File

@ -1,6 +1,5 @@
package com.rbkmoney.hooker.handler.poller.impl.invoicing;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
@ -16,20 +15,22 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa
InvoicingMessageDao messageDao;
@Override
protected void saveEvent(InvoiceChange ic, Event event) throws DaoException {
final String invoiceId = event.getSource().getInvoiceId();
protected void saveEvent(InvoiceChange ic, Long eventId, String eventCreatedAt, String sourceId, Long sequenceId, Integer changeId) throws DaoException {
//getAny any saved message for related invoice
InvoicingMessage message = getMessage(invoiceId, ic);
InvoicingMessage message = getMessage(sourceId, ic);
if (message == null) {
throw new DaoException("InvoicingMessage for invoice with id " + invoiceId + " not exist");
throw new DaoException("InvoicingMessage for invoice with id " + sourceId + " not exist");
}
message.setEventType(getEventType());
message.setType(getMessageType());
message.setEventId(event.getId());
message.setEventTime(event.getCreatedAt());
modifyMessage(ic, event, message);
messageDao.create(message);
message.setEventId(eventId);
message.setEventTime(eventCreatedAt);
message.setSequenceId(sequenceId);
message.setChangeId(changeId);
modifyMessage(ic, message);
if (!messageDao.updateIfExists(message)) {
messageDao.create(message);
}
}
protected abstract InvoicingMessage getMessage(String invoiceId, InvoiceChange ic);
@ -38,7 +39,7 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa
protected abstract EventType getEventType();
protected abstract void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message);
protected abstract void modifyMessage(InvoiceChange ic, InvoicingMessage message);
}

View File

@ -0,0 +1,24 @@
package com.rbkmoney.hooker.listener;
import com.rbkmoney.machinegun.eventsink.SinkEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaMachineEventListener {
private final MachineEventHandler machineEventHandler;
@KafkaListener(topics = "${kafka.topics.invoicing}", containerFactory = "kafkaListenerContainerFactory")
public void listen(SinkEvent message, Acknowledgment ack) {
log.debug("Got machineEvent: {}", message);
machineEventHandler.handle(message.getEvent(), ack);
log.debug("Handled machineEvent {}", message);
}
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.hooker.listener;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.springframework.kafka.support.Acknowledgment;
public interface MachineEventHandler {
void handle(MachineEvent message, Acknowledgment ack);
}

View File

@ -0,0 +1,59 @@
package com.rbkmoney.hooker.listener;
import com.rbkmoney.damsel.payment_processing.CustomerChange;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.hooker.converter.SourceEventParser;
import com.rbkmoney.hooker.service.HandlerManager;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
@RequiredArgsConstructor
public class MachineEventHandlerImpl implements MachineEventHandler {
private final HandlerManager handlerManager;
private final SourceEventParser eventParser;
@Override
@Transactional
public void handle(MachineEvent machineEvent, Acknowledgment ack) {
EventPayload payload = eventParser.parseEvent(machineEvent);
log.info("EventPayload payload: {}", payload);
if (payload.isSetInvoiceChanges()) {
for (int i = 0; i < payload.getInvoiceChanges().size(); ++i) {
InvoiceChange invoiceChange = payload.getInvoiceChanges().get(i);
try {
int j = i;
handlerManager.getHandler(invoiceChange)
.ifPresentOrElse(handler -> handler.handle(invoiceChange, null, machineEvent.getCreatedAt(), machineEvent.getSourceId(), machineEvent.getEventId(), j),
() -> log.debug("Handler for invoiceChange {} wasn't found (machineEvent {})", invoiceChange, machineEvent));
} catch (Exception ex) {
log.error("Failed to handle invoice change, invoiceChange='{}'", invoiceChange, ex);
throw ex;
}
}
} else if (payload.isSetCustomerChanges()) {
for (int i = 0; i < payload.getCustomerChanges().size(); ++i) {
CustomerChange customerChange = payload.getCustomerChanges().get(i);
try {
int j = i;
handlerManager.getHandler(customerChange)
.ifPresentOrElse(handler -> handler.handle(customerChange, null, machineEvent.getCreatedAt(), machineEvent.getSourceId(), machineEvent.getEventId(), j),
() -> log.debug("Handler for customerChange {} wasn't found (machineEvent {})", customerChange, machineEvent));
} catch (Exception ex) {
log.error("Failed to handle customer change, customerChange='{}'", customerChange, ex);
throw ex;
}
}
}
ack.acknowledge();
log.debug("Ack for machineEvent {} sent", machineEvent);
}
}

View File

@ -14,7 +14,9 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
public class CustomerMessage extends Message {
private long eventId;
private Long eventId;
private Long sequenceId;
private Integer changeId;
private String type;
private String occuredAt;
private String partyId;

View File

@ -13,7 +13,9 @@ import lombok.Setter;
@Getter
@Setter
public class InvoicingMessage extends Message {
private long eventId;
private Long eventId;
private Long sequenceId;
private Integer changeId;
private String eventTime;
private String type;
private String partyId;
@ -25,6 +27,8 @@ public class InvoicingMessage extends Message {
public InvoicingMessage(InvoicingMessage other) {
setId(other.getId());
this.eventId = other.eventId;
this.sequenceId = other.sequenceId;
this.changeId = other.changeId;
this.eventTime = other.eventTime;
this.type = other.type;
this.partyId = other.partyId;

View File

@ -0,0 +1,15 @@
package com.rbkmoney.hooker.serde;
import com.rbkmoney.kafka.common.deserializer.AbstractDeserializerAdapter;
import com.rbkmoney.machinegun.eventsink.SinkEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SinkEventDeserializer extends AbstractDeserializerAdapter<SinkEvent> {
@Override
public SinkEvent deserialize(String topic, byte[] data) {
return this.deserialize(data, new SinkEvent());
}
}

View File

@ -0,0 +1,19 @@
package com.rbkmoney.hooker.service;
import com.rbkmoney.hooker.handler.Handler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Optional;
@Service
@RequiredArgsConstructor
public class HandlerManager {
private final List<Handler> handlers;
public <C> Optional<Handler> getHandler(C change) {
return handlers.stream().filter(handler -> handler.accept(change)).findFirst();
}
}

View File

@ -0,0 +1,20 @@
package com.rbkmoney.hooker.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class SleepService {
public void safeSleep(Long timeout) {
try {
TimeUnit.MILLISECONDS.sleep(timeout);
} catch (InterruptedException e) {
log.error("InterruptedException when waite timeout ", e);
Thread.currentThread().interrupt();
}
}
}

View File

@ -5,6 +5,28 @@ spring:
name: @name@
description: @description@
flyway.schemas: hook
kafka:
bootstrap-servers: kenny-kafka1.bst1.rbkmoney.net:9092
client-id: newway
ssl:
enabled: false
trust-store-location: "test"
trust-store-password: "test"
key-store-location: "test"
key-store-password: "test"
key-password: "test"
key-store-type: PKCS12
trust-store-type: PKCS12
consumer:
concurrency: 8
group-id: "HookerListener"
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 20
topics:
invoicing: mg-invoice-100-2
info:
version: @version@
responsible: Inal Arsanukaev
@ -47,4 +69,7 @@ management:
export:
statsd:
enabled: false
flavor: etsy
flavor: etsy
retry-policy:
maxAttempts: -1

Binary file not shown.

View File

@ -0,0 +1,5 @@
ALTER TABLE hook.message ADD COLUMN sequence_id int;
ALTER TABLE hook.message ADD COLUMN change_id int;
ALTER TABLE hook.customer_message ADD COLUMN sequence_id int;
ALTER TABLE hook.customer_message ADD COLUMN change_id int;

View File

@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import java.time.Duration;
@ -21,9 +22,16 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ContextConfiguration(classes = HookerApplication.class, initializers = AbstractIntegrationTest.Initializer.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@Slf4j
public abstract class AbstractIntegrationTest {
public static final String SOURCE_ID = "source_id";
public static final String SOURCE_NS = "source_ns";
private static final String CONFLUENT_PLATFORM_VERSION = "5.0.1";
@ClassRule
public static KafkaContainer kafka = new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withEmbeddedZookeeper();
@ClassRule
public static PostgreSQLContainer postgres = (PostgreSQLContainer) new PostgreSQLContainer("postgres:9.6")
@ -40,7 +48,15 @@ public abstract class AbstractIntegrationTest {
"flyway.url=" + postgres.getJdbcUrl(),
"flyway.user=" + postgres.getUsername(),
"flyway.password=" + postgres.getPassword()
).applyTo(configurableApplicationContext);
).and("kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"kafka.ssl.enabled=false",
"kafka.consumer.group-id=TestListener",
"kafka.consumer.enable-auto-commit=false",
"kafka.consumer.auto-offset-reset=earliest",
"kafka.consumer.client-id=test",
"kafka.client-id=test",
"kafka.topics.invoicing=test-topic")
.applyTo(configurableApplicationContext);
Flyway flyway = Flyway.configure()
.dataSource(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())
.schemas("hook")

View File

@ -222,18 +222,18 @@ public class CustomerDataflowTest extends AbstractIntegrationTest {
}
public static class MockMessage {
private long eventID;
private Long eventID;
private String occuredAt;
private String topic;
private String eventType;
private Customer customer;
private CustomerBinding binding;
public long getEventID() {
public Long getEventID() {
return eventID;
}
public void setEventID(long eventID) {
public void setEventID(Long eventID) {
this.eventID = eventID;
}

View File

@ -1,7 +1,6 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.handler.poller.EventStockHandler;
import com.rbkmoney.hooker.handler.poller.impl.customer.AbstractCustomerEventHandler;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;

View File

@ -1,7 +1,6 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.handler.poller.EventStockHandler;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.InvoicingMessage;
@ -21,8 +20,7 @@ import java.util.Arrays;
import static com.rbkmoney.hooker.utils.BuildUtils.buildMessage;
import static com.rbkmoney.hooker.utils.BuildUtils.cart;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
/**
* Created by inalarsanukaev on 09.04.17.
@ -83,4 +81,13 @@ public class InvoicingMessageDaoImplTest extends AbstractIntegrationTest {
public void getMaxEventId() {
assertEquals(messageDao.getMaxEventId(workersCount, HashUtils.getIntHash("1234") % workersCount).longValue(), 5555);
}
@Test
public void testIsDuplicate(){
InvoicingMessage invoicingMessage = buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), false);
assertTrue(messageDao.updateIfExists(invoicingMessage));
invoicingMessage.getPayment().setStatus("processed");
assertFalse(messageDao.updateIfExists(invoicingMessage));
}
}

View File

@ -0,0 +1,98 @@
package com.rbkmoney.hooker.kafka;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.configuration.RetryConfig;
import com.rbkmoney.hooker.converter.SourceEventParser;
import com.rbkmoney.hooker.listener.KafkaMachineEventListener;
import com.rbkmoney.hooker.listener.MachineEventHandlerImpl;
import com.rbkmoney.hooker.service.HandlerManager;
import com.rbkmoney.kafka.common.serializer.ThriftSerializer;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import com.rbkmoney.machinegun.eventsink.SinkEvent;
import com.rbkmoney.machinegun.msgpack.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import static java.util.Collections.emptyList;
import static org.mockito.ArgumentMatchers.any;
@Slf4j
@TestPropertySource(properties = "spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
@ContextConfiguration(classes = {KafkaAutoConfiguration.class, KafkaMachineEventListener.class, MachineEventHandlerImpl.class, RetryConfig.class})
public class KafkaMachineEventListenerKafkaTest extends AbstractIntegrationTest {
@org.springframework.beans.factory.annotation.Value("${kafka.topics.invoicing}")
public String topic;
@MockBean
HandlerManager handlerManager;
@MockBean
SourceEventParser eventParser;
@Test
public void listenEmptyChanges() throws InterruptedException {
Mockito.when(eventParser.parseEvent(any())).thenReturn(EventPayload.invoice_changes(emptyList()));
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setEvent(createMessage());
writeToTopic(sinkEvent);
waitForTopicSync();
Mockito.verify(eventParser, Mockito.times(1)).parseEvent(any());
}
private void writeToTopic(SinkEvent sinkEvent) {
Producer<String, SinkEvent> producer = createProducer();
ProducerRecord<String, SinkEvent> producerRecord = new ProducerRecord<>(topic, null, sinkEvent);
try {
producer.send(producerRecord).get();
} catch (Exception e) {
log.error("KafkaAbstractTest initialize e: ", e);
}
producer.close();
}
private void waitForTopicSync() throws InterruptedException {
Thread.sleep(1000L);
}
private MachineEvent createMessage() {
MachineEvent message = new MachineEvent();
Value data = new Value();
data.setBin(new byte[0]);
message.setCreatedAt(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
message.setEventId(1L);
message.setSourceNs(SOURCE_NS);
message.setSourceId(SOURCE_ID);
message.setData(data);
return message;
}
public static Producer<String, SinkEvent> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client_id");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ThriftSerializer<SinkEvent>().getClass());
return new KafkaProducer<>(props);
}
}

View File

@ -0,0 +1,87 @@
package com.rbkmoney.hooker.listener;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.EventPayload;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.hooker.converter.SourceEventParser;
import com.rbkmoney.hooker.exception.ParseException;
import com.rbkmoney.hooker.handler.Handler;
import com.rbkmoney.hooker.service.HandlerManager;
import com.rbkmoney.machinegun.eventsink.MachineEvent;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.kafka.support.Acknowledgment;
import java.util.ArrayList;
import static org.mockito.ArgumentMatchers.any;
public class MachineEventHandlerImplTest {
@Mock
private HandlerManager handlerManager;
@Mock
private Handler handler;
@Mock
private SourceEventParser eventParser;
@Mock
private Acknowledgment ack;
private MachineEventHandlerImpl machineEventHandler;
@Before
public void init() {
MockitoAnnotations.initMocks(this);
machineEventHandler = new MachineEventHandlerImpl(handlerManager, eventParser);
}
@Test
public void listenEmptyChanges() {
Mockito.when(handlerManager.getHandler(any())).thenReturn(java.util.Optional.of(handler));
MachineEvent message = new MachineEvent();
Event event = new Event();
EventPayload payload = new EventPayload();
payload.setInvoiceChanges(new ArrayList<>());
event.setPayload(payload);
Mockito.when(eventParser.parseEvent(message)).thenReturn(payload);
machineEventHandler.handle(message, ack);
Mockito.verify(handlerManager, Mockito.times(0)).getHandler(any());
Mockito.verify(handler, Mockito.times(0)).handle(any(), any(), any(), any(), any(), any());
Mockito.verify(ack, Mockito.times(1)).acknowledge();
}
@Test(expected = ParseException.class)
public void listenEmptyException() {
MachineEvent message = new MachineEvent();
Mockito.when(eventParser.parseEvent(message)).thenThrow(new ParseException());
machineEventHandler.handle(message, ack);
Mockito.verify(ack, Mockito.times(0)).acknowledge();
}
@Test
public void listenChanges() {
MachineEvent message = new MachineEvent();
Event event = new Event();
EventPayload payload = new EventPayload();
ArrayList<InvoiceChange> invoiceChanges = new ArrayList<>();
invoiceChanges.add(new InvoiceChange());
payload.setInvoiceChanges(invoiceChanges);
event.setPayload(payload);
Mockito.when(eventParser.parseEvent(message)).thenReturn(payload);
Mockito.when(handlerManager.getHandler(any())).thenReturn(java.util.Optional.of(handler));
machineEventHandler.handle(message, ack);
Mockito.verify(handlerManager, Mockito.times(1)).getHandler(any());
Mockito.verify(handler, Mockito.times(1)).handle(any(), any(), any(), any(), any(), any());
Mockito.verify(ack, Mockito.times(1)).acknowledge();
}
}

View File

@ -20,7 +20,7 @@ public class BuildUtils {
public static InvoicingMessage buildMessage(String type, String invoiceId, String partyId, EventType eventType, String status, List<InvoiceCartPosition> cart, boolean isPayer) {
InvoicingMessage message = new InvoicingMessage();
message.setEventId(5555);
message.setEventId(5555L);
message.setEventTime("time");
message.setType(type);
message.setPartyId(partyId);