Removed event stock for invoicing, enabled kafka (#83)

* Removed event stock for invoicing, enabled kafka

* Event id stuff

* Set final

* Removed constraint

* Fixed sql-script
This commit is contained in:
Inal Arsanukaev 2019-07-08 19:45:29 +03:00 committed by GitHub
parent ffdc3d989d
commit 3542005a17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 110 additions and 190 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hooker</artifactId>
<version>2.0.28-SNAPSHOT</version>
<version>2.0.29-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hooker</name>

View File

@ -2,8 +2,8 @@ package com.rbkmoney.hooker.configuration;
import com.rbkmoney.eventstock.client.EventPublisher;
import com.rbkmoney.eventstock.client.poll.PollingEventPublisherBuilder;
import com.rbkmoney.hooker.handler.Handler;
import com.rbkmoney.hooker.handler.poller.EventStockHandler;
import com.rbkmoney.hooker.handler.poller.CustomerEventStockHandler;
import com.rbkmoney.hooker.handler.poller.impl.customer.AbstractCustomerEventHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@ -11,7 +11,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Configuration
@ -30,32 +29,21 @@ public class EventStockPollerConfig {
@Value("${bm.pooling.maxQuerySize}")
private int maxQuerySize;
@Value("${bm.pooling.workersCount}")
private int workersCount;
private final List<Handler> pollingEventHandlers;
private final List<AbstractCustomerEventHandler> pollingEventHandlers;
@Bean
public List<EventStockHandler> eventStockHandlers() {
List<EventStockHandler> eventStockHandlers = new ArrayList<>();
for (int i = 0; i < workersCount; ++i) {
eventStockHandlers.add(new EventStockHandler(pollingEventHandlers, workersCount, i));
}
return eventStockHandlers;
public CustomerEventStockHandler eventStockHandler() {
return new CustomerEventStockHandler(pollingEventHandlers);
}
@Bean
public List<EventPublisher> eventPublishers(List<EventStockHandler> eventStockHandlers) throws IOException {
List<EventPublisher> eventPublishers = new ArrayList<>();
for (int i = 0; i < workersCount; ++i) {
eventPublishers.add(new PollingEventPublisherBuilder()
public EventPublisher eventPublisher(CustomerEventStockHandler customerEventStockHandler) throws IOException {
return new PollingEventPublisherBuilder()
.withURI(bmUri.getURI())
.withEventHandler(eventStockHandlers.get(i))
.withEventHandler(customerEventStockHandler)
.withMaxPoolSize(maxPoolSize)
.withPollDelay(pollDelay)
.withMaxQuerySize(maxQuerySize)
.build());
}
return eventPublishers;
.build();
}
}

View File

@ -8,7 +8,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@Configuration
//@EnableKafka
@EnableKafka
public class KafkaConsumerBeanEnableConfig {
@Bean

View File

@ -5,6 +5,6 @@ import java.util.List;
public interface MessageDao<M> {
void create(M message) throws DaoException;
Long getMaxEventId(int div, int mod);
Long getMaxEventId();
List<M> getBy(Collection<Long> messageIds) throws DaoException;
}

View File

@ -73,7 +73,7 @@ public class CacheableInvoicingMessageDaoImpl extends InvoicingMessageDaoImpl {
}
private void putToCache(InvoicingMessage message){
if (message != null) {
if (message != null && message.getId() != null) {
cacheMng.putMessage(message);
cacheMng.putMessage(message.getInvoice().getId() + (message.isPayment() ? "_" + message.getPayment().getId() : "") + (message.isRefund() ? "_" + message.getPayment().getId() + "_" + message.getRefund().getId() : ""), message);
}

View File

@ -216,10 +216,10 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
}
@Override
public Long getMaxEventId(int div, int mod) {
final String sql = "select event_id from hook.customer_message where ('x0'||substr(md5(customer_id), 1, 7))::bit(32)::int % :div = :mod order by event_id desc limit 1";
public Long getMaxEventId() {
final String sql = "select max(event_id) from hook.customer_message ";
try {
return getNamedParameterJdbcTemplate().queryForObject(sql, new MapSqlParameterSource("div", div).addValue("mod", mod), Long.class);
return getNamedParameterJdbcTemplate().queryForObject(sql, new MapSqlParameterSource(), Long.class);
} catch (EmptyResultDataAccessException e) {
return null;
}

View File

@ -40,7 +40,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
InvoicingTaskDao taskDao;
public static final String ID = "id";
public static final String EVENT_ID = "event_id";
public static final String NEW_EVENT_ID = "new_event_id";
public static final String EVENT_TIME = "event_time";
public static final String SEQUENCE_ID = "sequence_id";
public static final String CHANGE_ID = "change_id";
@ -154,7 +154,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
private static RowMapper<InvoicingMessage> messageRowMapper = (rs, i) -> {
InvoicingMessage message = new InvoicingMessage();
message.setId(rs.getLong(ID));
message.setEventId(rs.getLong(EVENT_ID));
message.setEventId(rs.getLong(NEW_EVENT_ID));
message.setEventTime(rs.getString(EVENT_TIME));
message.setSequenceId(rs.getLong(SEQUENCE_ID));
message.setChangeId(rs.getInt(CHANGE_ID));
@ -310,7 +310,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
@Transactional
public void create(InvoicingMessage message) throws DaoException {
final String sql = "INSERT INTO hook.message" +
"(event_id, event_time, sequence_id, change_id, type, party_id, event_type, " +
"(event_time, sequence_id, change_id, type, party_id, event_type, " +
"invoice_id, shop_id, invoice_created_at, invoice_status, invoice_reason, invoice_due_date, invoice_amount, " +
"invoice_currency, invoice_content_type, invoice_content_data, invoice_product, invoice_description, " +
"payment_id, payment_created_at, payment_status, payment_failure, payment_failure_reason, payment_amount, " +
@ -319,7 +319,7 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
"payment_digital_wallet_provider, payment_digital_wallet_id, payment_crypto_currency, " +
"refund_id, refund_created_at, refund_status, refund_failure, refund_failure_reason, refund_amount, refund_currency, refund_reason) " +
"VALUES " +
"(:event_id, :event_time, :sequence_id, :change_id, :type, :party_id, CAST(:event_type as hook.eventtype), " +
"(:event_time, :sequence_id, :change_id, :type, :party_id, CAST(:event_type as hook.eventtype), " +
":invoice_id, :shop_id, :invoice_created_at, :invoice_status, :invoice_reason, :invoice_due_date, :invoice_amount, " +
":invoice_currency, :invoice_content_type, :invoice_content_data, :invoice_product, :invoice_description, " +
":payment_id, :payment_created_at, :payment_status, :payment_failure, :payment_failure_reason, :payment_amount, " +
@ -327,9 +327,9 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
":payment_customer_id, CAST(:payment_payer_type as hook.payment_payer_type), :payment_recurrent_parent_invoice_id, :payment_recurrent_parent_payment_id, CAST(:payment_tool_details_type as hook.payment_tool_details_type), " +
":payment_card_bin, :payment_card_last_digits, :payment_card_number_mask, :payment_card_token_provider, :payment_system, :payment_terminal_provider, :payment_digital_wallet_provider, :payment_digital_wallet_id, :payment_crypto_currency, " +
":refund_id, :refund_created_at, :refund_status, :refund_failure, :refund_failure_reason, :refund_amount, :refund_currency, :refund_reason) " +
"ON CONFLICT (invoice_id, sequence_id, change_id) DO NOTHING " +
"RETURNING id";
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue(EVENT_ID, message.getEventId())
.addValue(EVENT_TIME, message.getEventTime())
.addValue(SEQUENCE_ID, message.getSequenceId())
.addValue(CHANGE_ID, message.getChangeId())
@ -410,24 +410,22 @@ public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implem
try {
GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
getNamedParameterJdbcTemplate().update(sql, params, keyHolder);
message.setId(keyHolder.getKey().longValue());
Number key = keyHolder.getKey();
if (key != null) {
message.setId(key.longValue());
saveCart(message.getId(), message.getInvoice().getCart());
log.info("InvoicingMessage {} saved to db.", message);
queueDao.createWithPolicy(message.getId());
taskDao.create(message.getId());
}
} catch (NestedRuntimeException e) {
throw new DaoException("Couldn't create message with invoice_id "+ message.getInvoice().getId(), e);
}
}
@Override
public Long getMaxEventId(int div, int mod) {
final String sql = "select event_id from hook.message where ('x0'||substr(md5(invoice_id), 1, 7))::bit(32)::int % :div = :mod order by event_id desc limit 1";
try {
return getNamedParameterJdbcTemplate().queryForObject(sql, new MapSqlParameterSource("div", div).addValue("mod", mod), Long.class);
} catch (EmptyResultDataAccessException e) {
return null;
}
public Long getMaxEventId() {
throw new UnsupportedOperationException("Not supported yet");
}
@Override

View File

@ -9,35 +9,21 @@ import com.rbkmoney.geck.serializer.kit.json.JsonHandler;
import com.rbkmoney.geck.serializer.kit.tbase.TBaseProcessor;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.handler.Handler;
import com.rbkmoney.hooker.utils.HashUtils;
import com.rbkmoney.hooker.handler.poller.impl.customer.AbstractCustomerEventHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class EventStockHandler implements EventHandler<StockEvent> {
@RequiredArgsConstructor
public class CustomerEventStockHandler implements EventHandler<StockEvent> {
private static final int INITIAL_VALUE = 3;
private final AtomicInteger count = new AtomicInteger(INITIAL_VALUE);
private final List<Handler> pollingEventHandlers;
private final int divider;
private final int mod;
public EventStockHandler(List<Handler> pollingEventHandlers, int divider, int mod) {
this.pollingEventHandlers = pollingEventHandlers;
this.divider = divider;
this.mod = mod;
}
public int getDivider() {
return divider;
}
public int getMod() {
return mod;
}
private final List<AbstractCustomerEventHandler> pollingEventHandlers;
@Override
public EventAction handle(StockEvent stockEvent, String subsKey) {
@ -45,15 +31,10 @@ public class EventStockHandler implements EventHandler<StockEvent> {
EventPayload payload = processingEvent.getPayload();
List changes;
String sourceId;
if (payload.isSetInvoiceChanges()) {
changes = payload.getInvoiceChanges();
sourceId = processingEvent.getSource().getInvoiceId();
} else if (payload.isSetCustomerChanges()) {
if (payload.isSetCustomerChanges()) {
changes = payload.getCustomerChanges();
sourceId = processingEvent.getSource().getCustomerId();
} else return EventAction.CONTINUE;
if (!HashUtils.checkHashMod(sourceId, divider, mod)) {
} else {
return EventAction.CONTINUE;
}

View File

@ -38,7 +38,6 @@ public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler {
Invoice invoiceOrigin = ic.getInvoiceCreated().getInvoice();
//////
InvoicingMessage message = new InvoicingMessage();
message.setEventId(eventId);
message.setEventTime(eventCreatedAt);
message.setSequenceId(sequenceId);
message.setChangeId(changeId);

View File

@ -23,7 +23,6 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa
}
message.setEventType(getEventType());
message.setType(getMessageType());
message.setEventId(eventId);
message.setEventTime(eventCreatedAt);
message.setSequenceId(sequenceId);
message.setChangeId(changeId);

View File

@ -38,19 +38,6 @@ public class MachineEventHandlerImpl implements MachineEventHandler {
throw ex;
}
}
} else if (payload.isSetCustomerChanges()) {
for (int i = 0; i < payload.getCustomerChanges().size(); ++i) {
CustomerChange customerChange = payload.getCustomerChanges().get(i);
try {
int j = i;
handlerManager.getHandler(customerChange)
.ifPresentOrElse(handler -> handler.handle(customerChange, null, machineEvent.getCreatedAt(), machineEvent.getSourceId(), machineEvent.getEventId(), j),
() -> log.debug("Handler for customerChange {} wasn't found (machineEvent {})", customerChange, machineEvent));
} catch (Exception ex) {
log.error("Failed to handle customer change, customerChange='{}'", customerChange, ex);
throw ex;
}
}
}
ack.acknowledge();
log.debug("Ack for machineEvent {} sent", machineEvent);

View File

@ -2,7 +2,7 @@ package com.rbkmoney.hooker.listener;
import com.rbkmoney.eventstock.client.*;
import com.rbkmoney.eventstock.client.poll.EventFlowFilter;
import com.rbkmoney.hooker.handler.poller.EventStockHandler;
import com.rbkmoney.hooker.handler.poller.CustomerEventStockHandler;
import com.rbkmoney.hooker.service.EventService;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.event.ApplicationReadyEvent;
@ -16,23 +16,23 @@ import java.util.List;
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private final List<EventPublisher> eventPublishers;
private final List<EventStockHandler> eventStockHandlers;
private final List<CustomerEventStockHandler> customerEventStockHandlers;
private final EventService eventService;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
for (int i = 0; i < eventPublishers.size(); ++i) {
eventPublishers.get(i).subscribe(buildSubscriberConfig(eventStockHandlers.get(i)));
eventPublishers.get(i).subscribe(buildSubscriberConfig(customerEventStockHandlers.get(i)));
}
}
public SubscriberConfig buildSubscriberConfig(EventStockHandler eventStockHandler) {
return new DefaultSubscriberConfig(eventFilter(eventStockHandler));
public SubscriberConfig buildSubscriberConfig(CustomerEventStockHandler customerEventStockHandler) {
return new DefaultSubscriberConfig(eventFilter(customerEventStockHandler));
}
public EventFilter eventFilter(EventStockHandler eventStockHandler) {
public EventFilter eventFilter(CustomerEventStockHandler customerEventStockHandler) {
EventConstraint.EventIDRange eventIDRange = new EventConstraint.EventIDRange();
Long lastEventId = eventService.getLastEventId(eventStockHandler.getDivider(), eventStockHandler.getMod());
Long lastEventId = eventService.getLastEventId();
if (lastEventId != null) {
eventIDRange.setFromExclusive(lastEventId);
} else {

View File

@ -22,6 +22,11 @@ import java.util.Map;
@Getter
@Setter
public class CustomerMessageJson {
private static final ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
private static Map<EventType, Event.EventTypeEnum> eventTypeMapping = new HashMap<>();
static {
@ -46,10 +51,7 @@ public class CustomerMessageJson {
messageJson.topic = Event.TopicEnum.CUSTOMERSTOPIC.getValue();
messageJson.customer = message.getCustomer();
messageJson.eventType = eventTypeMapping.get(message.getEventType()).getValue();
return new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true)
.writeValueAsString(messageJson);
return objectMapper.writeValueAsString(messageJson);
}
@Getter

View File

@ -21,7 +21,13 @@ import java.util.Map;
@Getter
@Setter
public class InvoicingMessageJson {
private static final ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
private static Map<String, String> invoiceStatusesMapping = new HashMap<>();
static {
invoiceStatusesMapping.put("unpaid", "InvoiceCreated");
invoiceStatusesMapping.put("paid", "InvoicePaid");
@ -46,7 +52,7 @@ public class InvoicingMessageJson {
refundStatusesMapping.put("failed", "RefundFailed");
}
private long eventID;
private Long eventID;
private String occuredAt;
private String topic;
private String eventType;
@ -72,10 +78,7 @@ public class InvoicingMessageJson {
invoicingMessageJson.topic = Event.TopicEnum.INVOICESTOPIC.getValue();
invoicingMessageJson.invoice = message.getInvoice();
return new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true)
.writeValueAsString(invoicingMessageJson);
return objectMapper.writeValueAsString(invoicingMessageJson);
}
private static class InvoiceMessageJson extends InvoicingMessageJson {

View File

@ -9,5 +9,5 @@ import lombok.Setter;
@Getter
@Setter
public class Message {
private long id;
private Long id;
}

View File

@ -13,7 +13,6 @@ import java.util.HashMap;
*/
public class MetadataSerializer extends JsonSerializer<Content> {
//it's thread-safe object, parni)))
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override

View File

@ -14,16 +14,9 @@ public class EventService {
private final InvoicingMessageDao messageDao;
private final CustomerDao customerDao;
public Long getLastEventId(int div, int mod) {
Long invLastEventId = messageDao.getMaxEventId(div, mod);
Long custLastEventId = customerDao.getMaxEventId(div, mod);
Long max = invLastEventId;
if (invLastEventId == null) {
max = custLastEventId;
} else if (custLastEventId != null) {
max = Math.max(invLastEventId, custLastEventId);
}
log.info("Get last event id = {}", max);
return max;
public Long getLastEventId() {
Long custLastEventId = customerDao.getMaxEventId();
log.info("Get last event id = {}", custLastEventId);
return custLastEventId;
}
}

View File

@ -19,7 +19,7 @@ public class CustomerUtils {
private ObjectStack<String> names = new ObjectStack<>();
private ObjectStack<JsonNodeWrapper> nodes = new ObjectStack<>();
private JsonNode rootNode;
private ObjectMapper mapper = new ObjectMapper();
private static final ObjectMapper mapper = new ObjectMapper();
public JsonNode getResult(Value value) {
try {
@ -189,7 +189,7 @@ public class CustomerUtils {
public static JsonNode getJsonObject(String json) {
if (json != null) {
try {
return new ObjectMapper().readTree(json);
return mapper.readTree(json);
} catch (IOException e) {
log.warn("Unexpected error when converting json {}", json, e);
}

View File

@ -1,19 +0,0 @@
package com.rbkmoney.hooker.utils;
import org.springframework.util.DigestUtils;
public class HashUtils {
public static boolean checkHashMod(String str, int div, int mod) {
return (getIntHash(str) % div) == mod;
}
/**
* @param str
* @return int value of first 7 digits of md5-hash of invoice_id
*/
public static int getIntHash(String str) {
String hexStr = DigestUtils.md5DigestAsHex(str.getBytes());
return Integer.parseInt(hexStr.substring(0, 7), 16);
}
}

View File

@ -22,6 +22,8 @@ import java.util.Optional;
*/
public class PaymentToolUtils {
private static final ObjectMapper mapper = new ObjectMapper();
private static final Map<String, String> digitalWalletMapping = ImmutableMap.of(
PaymentToolDetailsDigitalWallet.DigitalWalletDetailsTypeEnum.DIGITALWALLETDETAILSQIWI.getValue(), DigitalWalletProvider.qiwi.name()
);
@ -149,7 +151,6 @@ public class PaymentToolUtils {
}
public static String getPaymentToolToken(PaymentTool paymentTool) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();
if (paymentTool.isSetBankCard()) {
BankCard pCard = paymentTool.getBankCard();

View File

@ -40,7 +40,6 @@ bm.pooling:
maxPoolSize: 1
delay: 5000
maxQuerySize: 500
workersCount: 5
# connection timeout
merchant.callback.timeout: 10

View File

@ -0,0 +1,12 @@
CREATE UNIQUE INDEX IF NOT EXISTS message_uniq_idx ON hook.message(invoice_id, sequence_id, change_id);
CREATE SEQUENCE hook.event_id_seq
INCREMENT 1
START 380000000
MINVALUE 380000000
MAXVALUE 9223372036854775807
CACHE 1;
ALTER TABLE hook.message ALTER COLUMN event_id DROP NOT NULL;
ALTER TABLE hook.message ADD COLUMN new_event_id bigint;
ALTER TABLE hook.message ALTER COLUMN new_event_id SET DEFAULT nextval('hook.event_id_seq'::regclass);

View File

@ -82,16 +82,16 @@ public class ComplexDataflowTest extends AbstractIntegrationTest {
@Test
public void testMessageSend() throws InterruptedException {
List<InvoicingMessage> sourceMessages = new ArrayList<>();
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE,"1", "partyId1", EventType.INVOICE_STATUS_CHANGED, "unpaid");
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE,"1", "partyId1", EventType.INVOICE_STATUS_CHANGED, "unpaid", null, true, 0L, 0);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "captured");
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "captured", null, true, 0L, 1);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "2", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "processed");
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "2", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "processed", null, true, 0L, 0);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "2", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "failed");
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "2", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "failed", null, true, 0L, 1);
messageDao.create(message);
sourceMessages.add(message);

View File

@ -89,10 +89,10 @@ public class DataflowTest extends AbstractIntegrationTest {
@Test
public void testMessageSend() throws InterruptedException {
List<InvoicingMessage> sourceMessages = new ArrayList<>();
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "1", "partyId1", EventType.INVOICE_CREATED, "status", cart(), true);
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "1", "partyId1", EventType.INVOICE_CREATED, "status", cart(), true, 0L, 0);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "1", "partyId1", EventType.INVOICE_PAYMENT_STARTED, "status");
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "1", "partyId1", EventType.INVOICE_PAYMENT_STARTED, "status", cart(), true, 0L, 1);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.INVOICE,"3", "partyId1", EventType.INVOICE_CREATED, "status");
@ -101,13 +101,13 @@ public class DataflowTest extends AbstractIntegrationTest {
message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "4", "qwe", EventType.INVOICE_CREATED, "status");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "5", "partyId2", EventType.INVOICE_CREATED, "status", cart(), false);
message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "5", "partyId2", EventType.INVOICE_CREATED, "status", cart(), false, 0L, 0);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "5", "partyId2", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "status", cart(), false);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "5", "partyId2", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "status", cart(), false, 0L, 1);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.REFUND, "5", "partyId2", EventType.INVOICE_PAYMENT_REFUND_STARTED, "status", cart(), false);
message = buildMessage(AbstractInvoiceEventHandler.REFUND, "5", "partyId2", EventType.INVOICE_PAYMENT_REFUND_STARTED, "status", cart(), false, 0L, 2);
messageDao.create(message);
sourceMessages.add(message);

View File

@ -4,7 +4,6 @@ import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.handler.poller.impl.customer.AbstractCustomerEventHandler;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.utils.HashUtils;
import com.rbkmoney.swag_webhook_events.Customer;
import org.junit.Before;
import org.junit.Test;
@ -30,9 +29,6 @@ public class CustomerMessageDaoImplTest extends AbstractIntegrationTest {
@Autowired
CustomerDao messageDao;
@Value("${bm.pooling.workersCount}")
private int workersCount;
private static boolean messagesCreated = false;
@Before
@ -51,6 +47,6 @@ public class CustomerMessageDaoImplTest extends AbstractIntegrationTest {
@Test
public void getMaxEventId() {
assertEquals(messageDao.getMaxEventId(workersCount, HashUtils.getIntHash("124") % workersCount).longValue(), 1L);
assertEquals(messageDao.getMaxEventId().longValue(), 1L);
}
}

View File

@ -4,7 +4,6 @@ import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.utils.HashUtils;
import com.rbkmoney.swag_webhook_events.CustomerPayer;
import org.junit.Before;
import org.junit.Test;
@ -12,7 +11,6 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@ -33,17 +31,14 @@ public class InvoicingMessageDaoImplTest extends AbstractIntegrationTest {
@Autowired
InvoicingMessageDao messageDao;
@Value("${bm.pooling.workersCount}")
private int workersCount;
private static boolean messagesCreated = false;
@Before
public void setUp() throws Exception {
if(!messagesCreated){
messageDao.create(buildMessage(AbstractInvoiceEventHandler.INVOICE,"1234", "56678", EventType.INVOICE_CREATED, "status"));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.INVOICE,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), true));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), false));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.INVOICE,"1235", "56678", EventType.INVOICE_CREATED, "status", cart(), true));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1236", "56678", EventType.INVOICE_CREATED, "status", cart(), false));
messagesCreated = true;
}
}
@ -67,18 +62,22 @@ public class InvoicingMessageDaoImplTest extends AbstractIntegrationTest {
@Test
public void get() throws Exception {
InvoicingMessage message = messageDao.getInvoice("1234");
InvoicingMessage message = messageDao.getInvoice("1235");
assertTrue(message.getEventId() >= 380000000);
assertEquals(message.getInvoice().getAmount(), 12235);
assertEquals(message.getInvoice().getCart().size(), 2);
assertEquals(1, messageDao.getBy(Arrays.asList(message.getId())).size());
InvoicingMessage payment = messageDao.getPayment("1234", "123");
InvoicingMessage payment = messageDao.getPayment("1236", "123");
assertTrue(payment.getPayment().getPayer() instanceof CustomerPayer);
}
@Test
public void getMaxEventId() {
assertEquals(messageDao.getMaxEventId(workersCount, HashUtils.getIntHash("1234") % workersCount).longValue(), 5555);
public void testDuplication(){
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "1234", "56678", EventType.INVOICE_CREATED, "status");
messageDao.create(message);
assertNull(message.getId());
}
}

View File

@ -31,7 +31,6 @@ import java.util.Properties;
import static java.util.Collections.emptyList;
import static org.mockito.ArgumentMatchers.any;
@Ignore
@Slf4j
@TestPropertySource(properties = "spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
@ContextConfiguration(classes = {KafkaAutoConfiguration.class, KafkaMachineEventListener.class, MachineEventHandlerImpl.class})

View File

@ -19,8 +19,11 @@ public class BuildUtils {
}
public static InvoicingMessage buildMessage(String type, String invoiceId, String partyId, EventType eventType, String status, List<InvoiceCartPosition> cart, boolean isPayer) {
return buildMessage(type, invoiceId, partyId, eventType, status, cart, isPayer, 0L, 0);
}
public static InvoicingMessage buildMessage(String type, String invoiceId, String partyId, EventType eventType, String status, List<InvoiceCartPosition> cart, boolean isPayer, Long sequenceId, Integer changeId) {
InvoicingMessage message = new InvoicingMessage();
message.setEventId(5555L);
message.setEventTime("time");
message.setType(type);
message.setPartyId(partyId);
@ -101,6 +104,8 @@ public class BuildUtils {
refund.setStatus("status");
refund.setReason("kek");
}
message.setSequenceId(sequenceId);
message.setChangeId(changeId);
return message;
}

View File

@ -1,21 +0,0 @@
package com.rbkmoney.hooker.utils;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import static org.junit.Assert.*;
public class HashUtilsTest extends AbstractIntegrationTest {
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void testGetIntHash() {
Integer javaHash = HashUtils.getIntHash("kek");
Integer postgresHash = jdbcTemplate.queryForObject("select ('x0'||substr(md5('kek'), 1, 7))::bit(32)::int", Integer.class);
assertEquals(javaHash, postgresHash);
}
}