Add check hook (#58)

This commit is contained in:
struga 2024-03-28 14:38:04 +03:00 committed by GitHub
parent f01f128519
commit 9fe3d01970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 271 additions and 45 deletions

View File

@ -28,7 +28,6 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import java.util.HashMap;
import java.util.Map;
@Slf4j

View File

@ -1,11 +1,13 @@
package dev.vality.hooker.configuration;
import dev.vality.hooker.converter.WebhookMessageBuilder;
import dev.vality.hooker.dao.InvoicingMessageDao;
import dev.vality.hooker.dao.MessageDao;
import dev.vality.hooker.model.CustomerMessage;
import dev.vality.hooker.model.InvoicingMessage;
import dev.vality.hooker.service.CustomerMessageService;
import dev.vality.hooker.service.EventService;
import dev.vality.hooker.service.MessageService;
import dev.vality.hooker.service.InvoiceMessageService;
import dev.vality.hooker.service.WebhookKafkaProducerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -13,19 +15,21 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class ServiceConfig {
@Bean
public MessageService<InvoicingMessage> invoicingService(MessageDao<InvoicingMessage> messageDao,
EventService<InvoicingMessage> eventService,
WebhookMessageBuilder webhookMessageBuilder,
WebhookKafkaProducerService webhookKafkaProducerService) {
return new MessageService<>(messageDao, eventService, webhookMessageBuilder, webhookKafkaProducerService);
public InvoiceMessageService invoicingService(InvoicingMessageDao messageDao,
EventService<InvoicingMessage> eventService,
WebhookMessageBuilder webhookMessageBuilder,
WebhookKafkaProducerService webhookKafkaProducerService) {
return new InvoiceMessageService(messageDao, eventService, webhookMessageBuilder, webhookKafkaProducerService);
}
@Bean
public MessageService<CustomerMessage> customerService(MessageDao<CustomerMessage> messageDao,
EventService<CustomerMessage> eventService,
WebhookMessageBuilder webhookMessageBuilder,
WebhookKafkaProducerService webhookKafkaProducerService) {
return new MessageService<>(messageDao, eventService, webhookMessageBuilder, webhookKafkaProducerService);
public CustomerMessageService<CustomerMessage> customerService(
MessageDao<CustomerMessage> messageDao,
EventService<CustomerMessage> eventService,
WebhookMessageBuilder webhookMessageBuilder,
WebhookKafkaProducerService webhookKafkaProducerService) {
return new CustomerMessageService<>(messageDao, eventService, webhookMessageBuilder,
webhookKafkaProducerService);
}
}

View File

@ -7,4 +7,7 @@ import dev.vality.hooker.model.InvoicingMessageKey;
public interface InvoicingMessageDao extends MessageDao<InvoicingMessage> {
InvoicingMessage getInvoicingMessage(InvoicingMessageKey key) throws NotFoundException, DaoException;
Boolean hasWebhooks(InvoicingMessage invoicingMessage) throws NotFoundException, DaoException;
}

View File

@ -87,7 +87,7 @@ public class CustomerDaoImpl implements CustomerDao {
@Override
public List<WebhookMessageModel<CustomerMessage>> getWebhookModels(Long messageId) {
final String sql = "select m.*, w.id as hook_id, w.url, pk.priv_key" +
final String sql = "select m.*, w.id as hook_id, w.url, pk.priv_key" +
" from hook.customer_message m" +
" join hook.webhook w on m.party_id = w.party_id " +
" and w.enabled and w.topic=CAST(:message_type as hook.message_topic)" +
@ -104,7 +104,7 @@ public class CustomerDaoImpl implements CustomerDao {
@Override
public Long getParentId(Long hookId, String customerId, Long messageId) {
final String sql = "select m.id" +
final String sql = "select m.id" +
" from hook.customer_message m " +
" join hook.webhook w on w.id=:hook_id" +
" join hook.webhook_to_events wte on wte.hook_id = w.id" +
@ -123,4 +123,5 @@ public class CustomerDaoImpl implements CustomerDao {
return parentNotExistId;
}
}
}

View File

@ -47,21 +47,21 @@ public class InvoicingDaoImpl implements InvoicingMessageDao {
"RETURNING id";
MapSqlParameterSource sqlParameterSources = new MapSqlParameterSource()
.addValue(InvoicingRowMapper.EVENT_TIME, message.getEventTime())
.addValue(InvoicingRowMapper.SEQUENCE_ID, message.getSequenceId())
.addValue(InvoicingRowMapper.CHANGE_ID, message.getChangeId())
.addValue(InvoicingRowMapper.TYPE, message.getType().getValue())
.addValue(InvoicingRowMapper.PARTY_ID, message.getPartyId())
.addValue(InvoicingRowMapper.EVENT_TYPE, message.getEventType().toString())
.addValue(InvoicingRowMapper.INVOICE_ID, message.getSourceId())
.addValue(InvoicingRowMapper.SHOP_ID, message.getShopId())
.addValue(InvoicingRowMapper.INVOICE_STATUS, message.getInvoiceStatus().getValue())
.addValue(InvoicingRowMapper.PAYMENT_ID, message.getPaymentId())
.addValue(InvoicingRowMapper.PAYMENT_STATUS,
message.getPaymentStatus() != null ? message.getPaymentStatus().getValue() : null)
.addValue(InvoicingRowMapper.REFUND_ID, message.getRefundId())
.addValue(InvoicingRowMapper.REFUND_STATUS,
message.getRefundStatus() != null ? message.getRefundStatus().getValue() : null);
.addValue(InvoicingRowMapper.EVENT_TIME, message.getEventTime())
.addValue(InvoicingRowMapper.SEQUENCE_ID, message.getSequenceId())
.addValue(InvoicingRowMapper.CHANGE_ID, message.getChangeId())
.addValue(InvoicingRowMapper.TYPE, message.getType().getValue())
.addValue(InvoicingRowMapper.PARTY_ID, message.getPartyId())
.addValue(InvoicingRowMapper.EVENT_TYPE, message.getEventType().toString())
.addValue(InvoicingRowMapper.INVOICE_ID, message.getSourceId())
.addValue(InvoicingRowMapper.SHOP_ID, message.getShopId())
.addValue(InvoicingRowMapper.INVOICE_STATUS, message.getInvoiceStatus().getValue())
.addValue(InvoicingRowMapper.PAYMENT_ID, message.getPaymentId())
.addValue(InvoicingRowMapper.PAYMENT_STATUS,
message.getPaymentStatus() != null ? message.getPaymentStatus().getValue() : null)
.addValue(InvoicingRowMapper.REFUND_ID, message.getRefundId())
.addValue(InvoicingRowMapper.REFUND_STATUS,
message.getRefundStatus() != null ? message.getRefundStatus().getValue() : null);
GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(sql, sqlParameterSources, keyHolder);
return keyHolder.getKey() == null ? null : keyHolder.getKey().longValue();
@ -91,7 +91,7 @@ public class InvoicingDaoImpl implements InvoicingMessageDao {
@Override
public List<WebhookMessageModel<InvoicingMessage>> getWebhookModels(Long messageId) {
final String sql = "select m.*, w.id as hook_id, w.url, pk.priv_key" +
final String sql = "select m.*, w.id as hook_id, w.url, pk.priv_key" +
" from hook.message m" +
" join hook.webhook w on m.party_id = w.party_id " +
" and w.enabled and w.topic=CAST(:message_type as hook.message_topic)" +
@ -109,9 +109,37 @@ public class InvoicingDaoImpl implements InvoicingMessageDao {
return jdbcTemplate.query(sql, mapSqlParameterSource, webhookMessageModelRowMapper);
}
@Override
public Boolean hasWebhooks(InvoicingMessage invoicingMessage) {
final String sql = "select count(*) " +
" from hook.webhook w " +
" join hook.webhook_to_events wte on wte.hook_id = w.id " +
" where w.party_id = :id " +
" and w.enabled and w.topic=CAST(:message_type as hook.message_topic) " +
" and wte.event_type = CAST(:event_type as hook.EventType) " +
" and (wte.invoice_shop_id = :shop_id or wte.invoice_shop_id is null) " +
" and (wte.invoice_status = :invoice_status or wte.invoice_status is null) " +
" and (wte.invoice_payment_status = :payment_status or wte.invoice_payment_status is null)" +
" and (wte.invoice_payment_refund_status = :refund_status " +
"or wte.invoice_payment_refund_status is null)";
MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource()
.addValue("id", invoicingMessage.getPartyId())
.addValue("shop_id", invoicingMessage.getShopId())
.addValue("event_type", invoicingMessage.getEventType().name())
.addValue("invoice_status", invoicingMessage.getInvoiceStatus() != null
? invoicingMessage.getInvoiceStatus().getValue() : null)
.addValue("payment_status", invoicingMessage.getPaymentStatus() != null
? invoicingMessage.getPaymentStatus().getValue() : null)
.addValue("refund_status", invoicingMessage.getRefundStatus() != null
? invoicingMessage.getRefundStatus().getValue() : null)
.addValue("message_type", Event.TopicEnum.INVOICESTOPIC.getValue());
Integer count = jdbcTemplate.queryForObject(sql, mapSqlParameterSource, Integer.class);
return count != null && count > 0;
}
@Override
public Long getParentId(Long hookId, String invoiceId, Long messageId) {
final String sql = "select m.id" +
final String sql = "select m.id" +
" from hook.message m " +
" join hook.webhook w on w.id=:hook_id" +
" join hook.webhook_to_events wte on wte.hook_id = w.id" +

View File

@ -7,7 +7,10 @@ import dev.vality.geck.filter.condition.IsNullCondition;
import dev.vality.geck.filter.rule.PathConditionRule;
import dev.vality.hooker.converter.InvoiceChangeToUserInteractionConverter;
import dev.vality.hooker.dao.InvoicingMessageDao;
import dev.vality.hooker.model.*;
import dev.vality.hooker.model.EventType;
import dev.vality.hooker.model.InvoicingMessage;
import dev.vality.hooker.model.InvoicingMessageEnum;
import dev.vality.hooker.model.InvoicingMessageKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

View File

@ -5,7 +5,7 @@ import dev.vality.damsel.payment_processing.EventPayload;
import dev.vality.hooker.handler.Mapper;
import dev.vality.hooker.model.CustomerMessage;
import dev.vality.hooker.model.EventInfo;
import dev.vality.hooker.service.MessageService;
import dev.vality.hooker.service.CustomerMessageService;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import lombok.RequiredArgsConstructor;
@ -23,7 +23,7 @@ public class CustomerMachineEventHandler implements MachineEventHandler {
private final MachineEventParser<EventPayload> parser;
private final List<Mapper<CustomerChange, CustomerMessage>> customerEventMappers;
private final MessageService<CustomerMessage> customerMessageService;
private final CustomerMessageService<CustomerMessage> customerMessageService;
@Override
@Transactional

View File

@ -5,8 +5,7 @@ import dev.vality.damsel.payment_processing.InvoiceChange;
import dev.vality.hooker.handler.Mapper;
import dev.vality.hooker.model.EventInfo;
import dev.vality.hooker.model.InvoicingMessage;
import dev.vality.hooker.service.HandlerManager;
import dev.vality.hooker.service.MessageService;
import dev.vality.hooker.service.InvoiceMessageService;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import lombok.RequiredArgsConstructor;
@ -24,7 +23,7 @@ public class InvoicingMachineEventHandler implements MachineEventHandler {
private final List<Mapper<InvoiceChange, InvoicingMessage>> handlers;
private final MachineEventParser<EventPayload> parser;
private final MessageService<InvoicingMessage> invoicingMessageService;
private final InvoiceMessageService invoicingMessageService;
@Override
@Transactional

View File

@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class MessageService<T extends Message> {
public class CustomerMessageService<T extends Message> {
private final MessageDao<T> messageDao;
private final EventService<T> eventService;
private final WebhookMessageBuilder webhookMessageBuilder;

View File

@ -0,0 +1,45 @@
package dev.vality.hooker.service;
import dev.vality.hooker.converter.WebhookMessageBuilder;
import dev.vality.hooker.dao.InvoicingMessageDao;
import dev.vality.hooker.model.InvoicingMessage;
import dev.vality.swag_webhook_events.model.Event;
import dev.vality.webhook.dispatcher.WebhookMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class InvoiceMessageService {
private final InvoicingMessageDao messageDao;
private final EventService<InvoicingMessage> eventService;
private final WebhookMessageBuilder webhookMessageBuilder;
private final WebhookKafkaProducerService webhookKafkaProducerService;
public void process(InvoicingMessage message) {
log.info("Start processing of message {}", message);
if (!messageDao.hasWebhooks(message)) {
log.info("End without hook processing of message {}", message);
return;
}
Long id = messageDao.save(message);
String sourceId = message.getSourceId();
if (id != null) {
message.setId(id);
var webhookModels = messageDao.getWebhookModels(id);
if (!webhookModels.isEmpty()) {
log.info("Processing {} webhook(s)", webhookModels.size());
Event event = eventService.getEventByMessage(message);
webhookModels.forEach(w -> {
Long hookId = w.getHookId();
Long parentEventId = messageDao.getParentId(hookId, sourceId, id);
WebhookMessage webhookMessage = webhookMessageBuilder.build(w, event, sourceId, parentEventId);
log.info("Try to send webhook to kafka: {}, parentId {}", webhookMessage, parentEventId);
webhookKafkaProducerService.send(webhookMessage);
log.info("Webhook to kafka was sent: sourceId={}", webhookMessage.getSourceId());
});
}
}
log.info("End processing of message {}", sourceId);
}
}

View File

@ -10,7 +10,6 @@ import dev.vality.swag_webhook_events.model.Payment;
import dev.vality.swag_webhook_events.model.PaymentResourcePayer;
import dev.vality.swag_webhook_events.model.RecurrentPayer;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;

View File

@ -6,7 +6,6 @@ import dev.vality.hooker.listener.CustomerEventKafkaListener;
import dev.vality.hooker.listener.CustomerMachineEventHandler;
import dev.vality.hooker.listener.InvoicingEventKafkaListener;
import dev.vality.hooker.listener.InvoicingMachineEventHandler;
import dev.vality.hooker.service.HandlerManager;
import dev.vality.kafka.common.serialization.ThriftSerializer;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.machinegun.eventsink.SinkEvent;

View File

@ -6,8 +6,7 @@ import dev.vality.damsel.payment_processing.InvoiceChange;
import dev.vality.hooker.exception.ParseException;
import dev.vality.hooker.handler.Mapper;
import dev.vality.hooker.model.InvoicingMessage;
import dev.vality.hooker.service.HandlerManager;
import dev.vality.hooker.service.MessageService;
import dev.vality.hooker.service.InvoiceMessageService;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import org.junit.Before;
@ -34,7 +33,7 @@ public class InvoicingMachineEventHandlerTest {
private InvoicingMachineEventHandler machineEventHandler;
private MessageService<InvoicingMessage> invoicingService;
private InvoiceMessageService invoicingService;
@Before
public void init() {

View File

@ -0,0 +1,119 @@
package dev.vality.hooker.service;
import dev.vality.damsel.domain.*;
import dev.vality.damsel.payment_processing.InvoicingSrv;
import dev.vality.hooker.config.PostgresqlSpringBootITest;
import dev.vality.hooker.dao.HookDao;
import dev.vality.hooker.dao.InvoicingMessageDao;
import dev.vality.hooker.dao.impl.InvoicingDaoImpl;
import dev.vality.hooker.model.*;
import dev.vality.hooker.utils.BuildUtils;
import org.apache.thrift.TException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@PostgresqlSpringBootITest
public class InvoicingMessageServiceFullTest {
public static final String SHOP_ID = "shopId";
public static final String INVOICE_ID = "invoice_id";
@MockBean
private InvoicingSrv.Iface invoicingClient;
@MockBean
private WebhookKafkaProducerService webhookKafkaProducerService;
@Autowired
private HookDao hookDao;
@Autowired
private InvoicingDaoImpl messageDao;
@Autowired
private InvoicingMessageDao invoicingMessageDao;
@Autowired
private InvoiceMessageService invoicingService;
private static final String PARTY_ID = "partyId";
@BeforeEach
public void setUp() throws Exception {
mockGetInvoice(InvoicePaymentStatus.cancelled(new InvoicePaymentCancelled()));
}
private void mockGetInvoice(InvoicePaymentStatus invoicePaymentStatus) throws TException, IOException {
when(invoicingClient.get(any(), any()))
.thenReturn(BuildUtils.buildInvoice("partyId", "invoiceId", "1", "1",
InvoiceStatus.paid(new InvoicePaid()), invoicePaymentStatus));
}
@Test
public void testProcess() throws TException, IOException {
InvoicingMessage invoicingMessage = buildMessage(PARTY_ID, INVOICE_ID,
InvoicingMessageEnum.PAYMENT, EventType.INVOICE_PAYMENT_STATUS_CHANGED);
invoicingService.process(invoicingMessage);
mockGetInvoice(InvoicePaymentStatus.failed(new InvoicePaymentFailed()));
invoicingMessage.setPaymentStatus(PaymentStatusEnum.FAILED);
invoicingMessage.setSequenceId(2L);
invoicingService.process(invoicingMessage);
mockGetInvoice(InvoicePaymentStatus.captured(new InvoicePaymentCaptured()));
invoicingMessage.setPaymentStatus(PaymentStatusEnum.CAPTURED);
invoicingMessage.setSequenceId(3L);
invoicingService.process(invoicingMessage);
Boolean hasWebhooks = messageDao.hasWebhooks(invoicingMessage);
assertFalse(hasWebhooks);
Hook hook = BuildUtils.buildHook(PARTY_ID, SHOP_ID, "paid", null, "www.kek.ru",
EventType.INVOICE_PAYMENT_STATUS_CHANGED, "captured", "cancelled");
hookDao.create(hook);
hasWebhooks = messageDao.hasWebhooks(invoicingMessage);
assertTrue(hasWebhooks);
mockGetInvoice(InvoicePaymentStatus.failed(new InvoicePaymentFailed()));
invoicingMessage.setPaymentStatus(PaymentStatusEnum.FAILED);
invoicingMessage.setSequenceId(4L);
invoicingService.process(invoicingMessage);
mockGetInvoice(InvoicePaymentStatus.captured(new InvoicePaymentCaptured()));
invoicingMessage.setPaymentStatus(PaymentStatusEnum.CAPTURED);
invoicingMessage.setSequenceId(5L);
invoicingService.process(invoicingMessage);
verify(webhookKafkaProducerService, times(1)).send(any());
Long parentEventId = messageDao.getParentId(hook.getId(), INVOICE_ID, invoicingMessage.getId());
assertEquals(-1, parentEventId);
}
private InvoicingMessage buildMessage(String partyId, String invoiceId,
InvoicingMessageEnum type, EventType eventType) {
InvoicingMessage invoicingMessage = new InvoicingMessage();
invoicingMessage.setSequenceId(1L);
invoicingMessage.setChangeId(1);
invoicingMessage.setPaymentId("1");
invoicingMessage.setEventTime("2016-03-22T06:12:27Z");
invoicingMessage.setSourceId(invoiceId);
invoicingMessage.setPartyId(partyId);
invoicingMessage.setShopId(SHOP_ID);
invoicingMessage.setEventType(eventType);
invoicingMessage.setType(type);
invoicingMessage.setInvoiceStatus(InvoiceStatusEnum.PAID);
invoicingMessage.setPaymentStatus(PaymentStatusEnum.CANCELLED);
return invoicingMessage;
}
}

View File

@ -22,7 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
public class InvoicingMessageServiceTest {
@Autowired
private MessageService<InvoicingMessage> invoicingService;
private InvoiceMessageService invoicingService;
@Autowired
private HookDao hookDao;

View File

@ -201,7 +201,35 @@ public class BuildUtils {
Set<WebhookAdditionalFilter> webhookAdditionalFilters = new HashSet<>();
for (EventType type : types) {
webhookAdditionalFilters.add(WebhookAdditionalFilter.builder().eventType(type).build());
webhookAdditionalFilters.add(WebhookAdditionalFilter.builder()
.eventType(type)
.build());
}
hook.setFilters(webhookAdditionalFilters);
return hook;
}
public static Hook buildHook(String partyId,
String shopId,
String invoiceStatus,
String invoicePaymentRefundStatus,
String url,
EventType type,
String... invoicePaymentStatuses) {
Hook hook = new Hook();
hook.setPartyId(partyId);
hook.setTopic(Event.TopicEnum.INVOICESTOPIC.getValue());
hook.setUrl(url);
Set<WebhookAdditionalFilter> webhookAdditionalFilters = new HashSet<>();
for (String invoicePaymentStatus : invoicePaymentStatuses) {
webhookAdditionalFilters.add(WebhookAdditionalFilter.builder()
.eventType(type)
.shopId(shopId)
.invoiceStatus(invoiceStatus)
.invoicePaymentStatus(invoicePaymentStatus)
.invoicePaymentRefundStatus(invoicePaymentRefundStatus)
.build());
}
hook.setFilters(webhookAdditionalFilters);
return hook;