Independent processing of queue (#135)

* Independent processing of queue

* Fixed millis

* Fix after test in dev mirror kafka

* Disable schedulling in test

Co-authored-by: Inal Arsanukaev <inalarsanukaev@MacBook-Pro-Inal.local>
This commit is contained in:
Inal Arsanukaev 2020-11-23 20:20:56 +03:00 committed by GitHub
parent 1697899893
commit ad41e65510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 300 additions and 373 deletions

@ -1 +1 @@
Subproject commit f42e059d9ec93826ba4ad23232eed8ce67bd5486
Subproject commit ccf618949b95590d572157b248289428abeaa2e5

View File

@ -6,11 +6,40 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.dao.impl.CustomerQueueDao;
import com.rbkmoney.hooker.dao.impl.CustomerTaskDao;
import com.rbkmoney.hooker.dao.impl.InvoicingQueueDao;
import com.rbkmoney.hooker.dao.impl.InvoicingTaskDao;
import com.rbkmoney.hooker.model.*;
import com.rbkmoney.hooker.retry.RetryPoliciesService;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.*;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.support.TransactionTemplate;
@Configuration
public class AppConfig {
@Value("${message.scheduler.invoicing.threadPoolSize}")
private int invoicingThreadPoolSize;
@Value("${message.scheduler.customer.threadPoolSize}")
private int customerThreadPoolSize;
@Value("${message.scheduler.delay}")
private int delayMillis;
@Value("${merchant.callback.timeout}")
private int timeout;
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
@ -20,4 +49,50 @@ public class AppConfig {
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@Bean
public MessageSender<InvoicingMessage, InvoicingQueue> invoicngMessageSender(Signer signer,
InvoicingEventService eventService,
ObjectMapper objectMapper) {
return new MessageSender<>(invoicingThreadPoolSize, timeout, signer, eventService, objectMapper);
}
@Bean
public MessageSender<CustomerMessage, CustomerQueue> customerMessageSender(Signer signer,
CustomerEventService eventService,
ObjectMapper objectMapper) {
return new MessageSender<>(customerThreadPoolSize, timeout, signer, eventService, objectMapper);
}
@Bean
public MessageProcessor<InvoicingMessage, InvoicingQueue> invoicingMessageProcessor(InvoicingTaskDao taskDao,
InvoicingQueueDao queueDao,
InvoicingMessageDao messageDao,
RetryPoliciesService retryPoliciesService,
TransactionTemplate transactionTemplate,
MessageSender<InvoicingMessage, InvoicingQueue> invoicngMessageSender) {
return new MessageProcessor<>(taskDao, queueDao, messageDao, retryPoliciesService, transactionTemplate, invoicngMessageSender);
}
@Bean
public MessageProcessor<CustomerMessage, CustomerQueue> customerMessageProcessor(CustomerTaskDao taskDao,
CustomerQueueDao queueDao,
CustomerDao messageDao,
RetryPoliciesService retryPoliciesService,
TransactionTemplate transactionTemplate,
MessageSender<CustomerMessage, CustomerQueue> customerMessageSender) {
return new MessageProcessor<>(taskDao, queueDao, messageDao, retryPoliciesService, transactionTemplate, customerMessageSender);
}
@Bean
public MessageScheduler<InvoicingMessage, InvoicingQueue> invoicingMessageScheduler(MessageProcessor<InvoicingMessage, InvoicingQueue> invoicingMessageProcessor,
ThreadPoolTaskScheduler taskScheduler) {
return new MessageScheduler<>(invoicingThreadPoolSize, delayMillis, invoicingMessageProcessor, taskScheduler);
}
@Bean
public MessageScheduler<CustomerMessage, CustomerQueue> cuustomerMessageScheduler(MessageProcessor<CustomerMessage, CustomerQueue> customerMessageProcessor,
ThreadPoolTaskScheduler taskScheduler) {
return new MessageScheduler<>(customerThreadPoolSize, delayMillis, customerMessageProcessor, taskScheduler);
}
}

View File

@ -15,5 +15,5 @@ public interface TaskDao {
void removeAll(long queueId) throws DaoException;
Map<Long, List<Task>> getScheduled(int limit) throws DaoException;
Map<Long, List<Task>> getScheduled() throws DaoException;
}

View File

@ -75,7 +75,7 @@ public class CustomerTaskDao extends AbstractTaskDao {
}
@Override
public Map<Long, List<Task>> getScheduled(int limit) throws DaoException {
public Map<Long, List<Task>> getScheduled() throws DaoException {
final String sql = " WITH scheduled AS (" +
"SELECT st.message_id, st.queue_id, cq.customer_id " +
"FROM hook.scheduled_task st " +
@ -84,21 +84,21 @@ public class CustomerTaskDao extends AbstractTaskDao {
"AND st.message_type=srp.message_type " +
"WHERE st.message_type = CAST(:message_type as hook.message_topic) " +
"AND COALESCE(srp.next_fire_time_ms, 0) < :curr_time " +
"ORDER BY st.message_id " +
"ASC LIMIT :limit " +
"FOR UPDATE SKIP LOCKED " +
"ORDER BY st.message_id ASC " +
"LIMIT 1 " +
"FOR UPDATE OF cq SKIP LOCKED " +
"), locked_customer_queue AS (" +
" SELECT cq.id FROM hook.customer_queue cq " +
" WHERE cq.customer_id IN (SELECT DISTINCT schd.customer_id FROM scheduled schd) " +
" FOR UPDATE SKIP LOCKED " +
") SELECT message_id, queue_id FROM scheduled s " +
" JOIN locked_customer_queue cq ON s.queue_id = cq.id " +
" ORDER BY s.message_id";
" SELECT ciq.id FROM hook.customer_queue ciq " +
" WHERE ciq.customer_id IN (SELECT DISTINCT schd.customer_id FROM scheduled schd) " +
" FOR UPDATE OF ciq SKIP LOCKED " +
") SELECT message_id, queue_id FROM hook.scheduled_task s " +
" JOIN locked_customer_queue lq ON s.queue_id = lq.id " +
" ORDER BY s.message_id" +
" FOR UPDATE OF s SKIP LOCKED";
try {
List<Task> tasks = jdbcTemplate.query(sql,
new MapSqlParameterSource("message_type", getMessageTopic())
.addValue("curr_time", System.currentTimeMillis())
.addValue("limit", limit),
.addValue("curr_time", System.currentTimeMillis()),
taskRowMapper);
return splitByQueue(tasks);
} catch (NestedRuntimeException e) {

View File

@ -92,7 +92,7 @@ public class InvoicingTaskDao extends AbstractTaskDao {
}
@Override
public Map<Long, List<Task>> getScheduled(int limit) throws DaoException {
public Map<Long, List<Task>> getScheduled() throws DaoException {
final String sql = " WITH scheduled AS (" +
"SELECT st.message_id, st.queue_id, iq.invoice_id " +
"FROM hook.scheduled_task st " +
@ -101,21 +101,21 @@ public class InvoicingTaskDao extends AbstractTaskDao {
"AND st.message_type=srp.message_type " +
"WHERE st.message_type = CAST(:message_type as hook.message_topic) " +
"AND COALESCE(srp.next_fire_time_ms, 0) < :curr_time " +
"ORDER BY st.message_id " +
"ASC LIMIT :limit " +
"FOR UPDATE SKIP LOCKED " +
"ORDER BY st.message_id ASC " +
"LIMIT 1 " +
"FOR UPDATE OF iq SKIP LOCKED " +
"), locked_invoicing_queue AS (" +
" SELECT liq.id FROM hook.invoicing_queue liq " +
" WHERE liq.invoice_id IN (SELECT DISTINCT schd.invoice_id FROM scheduled schd) " +
" FOR UPDATE SKIP LOCKED " +
") SELECT message_id, queue_id FROM scheduled s " +
" FOR UPDATE OF liq SKIP LOCKED " +
") SELECT message_id, queue_id FROM hook.scheduled_task s " +
" JOIN locked_invoicing_queue lq ON s.queue_id = lq.id " +
" ORDER BY s.message_id";
" ORDER BY s.message_id " +
" FOR UPDATE OF s SKIP LOCKED";
try {
List<Task> tasks = jdbcTemplate.query(sql,
new MapSqlParameterSource("message_type", getMessageTopic())
.addValue("curr_time", System.currentTimeMillis())
.addValue("limit", limit),
.addValue("curr_time", System.currentTimeMillis()),
taskRowMapper);
return splitByQueue(tasks);
} catch (NestedRuntimeException e) {

View File

@ -4,9 +4,6 @@ import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Created by inalarsanukaev on 14.11.17.
*/
@Data
@NoArgsConstructor
public class Queue {

View File

@ -0,0 +1,13 @@
package com.rbkmoney.hooker.model;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class QueueStatus {
private Queue queue;
private boolean isSuccess;
private List<Long> messagesDone = new ArrayList<>();
}

View File

@ -5,9 +5,6 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* Created by jeckep on 17.04.17.
*/
@AllArgsConstructor
@Getter
@Setter

View File

@ -1,155 +1,26 @@
package com.rbkmoney.hooker.scheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.QueueDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.exception.DaoException;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.model.Task;
import com.rbkmoney.hooker.retry.RetryPoliciesService;
import com.rbkmoney.hooker.retry.RetryPolicy;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import com.rbkmoney.hooker.service.EventService;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import com.rbkmoney.hooker.service.MessageProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import java.util.stream.IntStream;
@Slf4j
public abstract class MessageScheduler<M extends Message, Q extends Queue> {
@Value("${message.scheduler.threadPoolSize}")
private int threadPoolSize;
@Value("${message.scheduler.limit}")
private int scheduledLimit;
@Value("${merchant.callback.timeout}")
private int httpTimeout;
private TaskDao taskDao;
private QueueDao<Q> queueDao;
private MessageDao<M> messageDao;
private EventService eventService;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RetryPoliciesService retryPoliciesService;
@Autowired
private Signer signer;
@Autowired
private TransactionTemplate transactionTemplate;
@RequiredArgsConstructor
public class MessageScheduler<M extends Message, Q extends Queue> {
private final int threadPoolSize;
private final int delayMillis;
private final MessageProcessor<M, Q> messageProcessor;
private final ThreadPoolTaskScheduler executorService;
private ExecutorService executorService;
public MessageScheduler(TaskDao taskDao, QueueDao<Q> queueDao, MessageDao<M> messageDao, EventService eventService, int numberOfWorkers) {
this.taskDao = taskDao;
this.queueDao = queueDao;
this.messageDao = messageDao;
this.eventService = eventService;
this.executorService = Executors.newFixedThreadPool(numberOfWorkers);
}
@Scheduled(fixedRateString = "${message.scheduler.delay}")
public void loop() {
transactionTemplate.execute(k -> {
processLoop();
return null;
});
}
private void processLoop() {
Map<Long, List<Task>> scheduledTasks = taskDao.getScheduled(scheduledLimit);
log.debug("scheduledTasks {}", scheduledTasks);
if (scheduledTasks.entrySet().isEmpty()) {
return;
}
Set<Long> queueIds = scheduledTasks.keySet();
Map<Long, Q> queuesMap = queueDao.getWithPolicies(queueIds).stream().collect(Collectors.toMap(Queue::getId, q -> q));
Set<Long> messageIds = scheduledTasks.values().stream().flatMap(Collection::stream).map(Task::getMessageId).collect(Collectors.toSet());
Map<Long, M> messagesMap = messageDao.getBy(messageIds).stream().collect(Collectors.toMap(Message::getId, m -> m));
List<MessageSender<?>> messageSenders = new ArrayList<>(queueIds.size());
for (Long queueId : queueIds) {
List<M> messagesForQueue = scheduledTasks.get(queueId).stream().map(t -> messagesMap.get(t.getMessageId())).collect(Collectors.toList());
MessageSender messageSender = getMessageSender(new MessageSender.QueueStatus(queuesMap.get(queueId)),
messagesForQueue, signer, new PostSender(threadPoolSize, httpTimeout), eventService, objectMapper);
messageSenders.add(messageSender);
}
try {
List<Future<MessageSender.QueueStatus>> futureList = executorService.invokeAll(messageSenders);
for (Future<MessageSender.QueueStatus> status : futureList) {
if (!status.isCancelled()) {
try {
MessageSender.QueueStatus queueStatus = status.get();
try {
Queue queue = queueStatus.getQueue();
queueStatus.getMessagesDone().forEach(id -> taskDao.remove(queue.getId(), id));
if (queueStatus.isSuccess()) {
done(queue);
} else {
fail(queue);
}
} catch (DaoException e) {
log.error("DaoException error when remove sent messages. It's not a big deal, but some messages can be re-sent: {}",
status.get().getMessagesDone(), e);
}
} catch (ExecutionException e) {
log.error("Unexpected error when get queue", e);
}
}
}
} catch (InterruptedException e) {
log.error("Thread was interrupted", e);
Thread.currentThread().interrupt();
}
}
protected abstract MessageSender getMessageSender(MessageSender.QueueStatus queueStatus, List<M> messagesForQueue,
Signer signer, PostSender postSender, EventService eventService, ObjectMapper objectMapper);
private void done(Queue queue) {
if (queue.getRetryPolicyRecord().isFailed()) {
RetryPolicyRecord record = queue.getRetryPolicyRecord();
record.reset();
retryPoliciesService.update(record);
}
}
private void fail(Queue queue) {
log.warn("Queue {} failed.", queue.getId());
RetryPolicy retryPolicy = retryPoliciesService.getRetryPolicyByType(queue.getHook().getRetryPolicyType());
RetryPolicyRecord retryPolicyRecord = queue.getRetryPolicyRecord();
retryPolicy.updateFailed(retryPolicyRecord);
retryPoliciesService.update(retryPolicyRecord);
if (retryPolicy.shouldDisable(retryPolicyRecord)) {
queueDao.disable(queue.getId());
taskDao.removeAll(queue.getId());
log.warn("Queue {} was disabled according to retry policy.", queue.getId());
}
}
@PreDestroy
public void preDestroy() {
executorService.shutdownNow();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("Failed to stop scheduller in time.");
} else {
log.info("Scheduller stopped.");
}
} catch (InterruptedException e) {
log.warn("Waiting for scheduller shutdown is interrupted.");
Thread.currentThread().interrupt();
}
@PostConstruct
public void init() {
IntStream.range(0, threadPoolSize).forEach(i ->
executorService.scheduleWithFixedDelay(messageProcessor, delayMillis));
}
}

View File

@ -3,6 +3,8 @@ package com.rbkmoney.hooker.scheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.model.QueueStatus;
import com.rbkmoney.hooker.model.Task;
import com.rbkmoney.hooker.service.EventService;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
@ -14,73 +16,53 @@ import org.apache.http.HttpStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
@Slf4j
@RequiredArgsConstructor
public abstract class MessageSender<M extends Message> implements Callable<MessageSender.QueueStatus> {
private final MessageSender.QueueStatus queueStatus;
private final List<M> messages;
public class MessageSender<M extends Message, Q extends Queue> {
private final int connectionPoolSize;
private final int timeout;
private final Signer signer;
private final PostSender postSender;
private final EventService<M> eventService;
private final ObjectMapper objectMapper;
@Override
public MessageSender.QueueStatus call() {
M currentMessage = null;
try {
for (M message : messages) {
currentMessage = message;
Event event = eventService.getEventByMessage(message);
final String messageJson = objectMapper.writeValueAsString(event);
final String signature = signer.sign(messageJson, queueStatus.getQueue().getHook().getPrivKey());
int statusCode = postSender.doPost(queueStatus.getQueue().getHook().getUrl(), message.getId(), messageJson, signature);
if (statusCode != HttpStatus.SC_OK) {
String wrongCodeMessage = String.format("Wrong status code: %d from merchant, we'll try to resend it. Message with id: %d %s", statusCode, message.getId(), message);
log.info(wrongCodeMessage);
throw new PostRequestException(wrongCodeMessage);
public List<QueueStatus> send(Map<Long, List<Task>> scheduledTasks, Map<Long, Q> queuesMap, Map<Long, M> messagesMap) {
PostSender postSender = new PostSender(connectionPoolSize, timeout);
List<QueueStatus> queueStatuses = new ArrayList<>();
for (Map.Entry<Long, List<Task>> entry : scheduledTasks.entrySet()) {
Long queueId = entry.getKey();
List<Task> tasks = entry.getValue();
Q queue = queuesMap.get(queueId);
QueueStatus queueStatus = new QueueStatus();
queueStatus.setQueue(queue);
M currentMessage = null;
try {
for (Task task : tasks) {
long messageId = task.getMessageId();
M message = messagesMap.get(messageId);
currentMessage = message;
Event event = eventService.getEventByMessage(message);
final String messageJson = objectMapper.writeValueAsString(event);
final String signature = signer.sign(messageJson, queue.getHook().getPrivKey());
int statusCode = postSender.doPost(queue.getHook().getUrl(), message.getId(), messageJson, signature);
if (statusCode != HttpStatus.SC_OK) {
String wrongCodeMessage = String.format("Wrong status code: %d from merchant, we'll try to resend it. Message with id: %d %s", statusCode, message.getId(), message);
log.info(wrongCodeMessage);
throw new PostRequestException(wrongCodeMessage);
}
queueStatus.getMessagesDone().add(message.getId());
}
queueStatus.messagesDone.add(message.getId());
queueStatus.setSuccess(true);
} catch (Exception e) {
if (currentMessage != null)
log.warn("Couldn't send message with id {} {} to hook {}. We'll try to resend it", currentMessage.getId(), currentMessage, queue.getHook(), e);
queueStatus.setSuccess(false);
}
queueStatus.setSuccess(true);
} catch (Exception e) {
if (currentMessage != null)
log.warn("Couldn't send message with id {} {} to hook {}. We'll try to resend it", currentMessage.getId(), currentMessage, queueStatus.getQueue().getHook(), e);
queueStatus.setSuccess(false);
queueStatuses.add(queueStatus);
}
return queueStatus;
return queueStatuses;
}
public static class QueueStatus {
private Queue queue;
private boolean isSuccess;
private List<Long> messagesDone = new ArrayList<>();
public QueueStatus(Queue queue) {
this.queue = queue;
}
public Queue getQueue() {
return queue;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
public List<Long> getMessagesDone() {
return messagesDone;
}
}
}
}

View File

@ -1,33 +0,0 @@
package com.rbkmoney.hooker.scheduler.customer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.impl.CustomerQueueDao;
import com.rbkmoney.hooker.dao.impl.CustomerTaskDao;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.CustomerQueue;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.CustomerEventService;
import com.rbkmoney.hooker.service.EventService;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CustomerMessageScheduler extends MessageScheduler<CustomerMessage, CustomerQueue> {
public CustomerMessageScheduler(CustomerTaskDao taskDao, CustomerQueueDao queueDao, CustomerDao customerDao,
CustomerEventService eventService, @Value("${message.scheduler.threadPoolSize}") int numberOfWorkers) {
super(taskDao, queueDao, customerDao, eventService, numberOfWorkers);
}
@Override
protected MessageSender getMessageSender(MessageSender.QueueStatus queueStatus, List<CustomerMessage> messagesForQueue,
Signer signer, PostSender postSender, EventService eventService, ObjectMapper objectMapper) {
return new CustomerMessageSender(queueStatus, messagesForQueue, signer, postSender, eventService, objectMapper);
}
}

View File

@ -1,18 +0,0 @@
package com.rbkmoney.hooker.scheduler.customer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.EventService;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import java.util.List;
public class CustomerMessageSender extends MessageSender<CustomerMessage> {
public CustomerMessageSender(MessageSender.QueueStatus queueStatus, List<CustomerMessage> messages, Signer signer,
PostSender postSender, EventService eventService, ObjectMapper objectMapper) {
super(queueStatus, messages, signer, postSender, eventService, objectMapper);
}
}

View File

@ -1,33 +0,0 @@
package com.rbkmoney.hooker.scheduler.invoicing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.dao.impl.InvoicingQueueDao;
import com.rbkmoney.hooker.dao.impl.InvoicingTaskDao;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.InvoicingQueue;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.EventService;
import com.rbkmoney.hooker.service.InvoicingEventService;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class InvoicingMessageScheduler extends MessageScheduler<InvoicingMessage, InvoicingQueue> {
public InvoicingMessageScheduler(InvoicingTaskDao taskDao, InvoicingQueueDao queueDao, InvoicingMessageDao messageDao,
InvoicingEventService eventService, @Value("${message.scheduler.threadPoolSize}") int numberOfWorkers) {
super(taskDao, queueDao, messageDao, eventService, numberOfWorkers);
}
@Override
protected MessageSender getMessageSender(MessageSender.QueueStatus queueStatus, List<InvoicingMessage> messagesForQueue,
Signer signer, PostSender postSender, EventService eventService, ObjectMapper objectMapper) {
return new InvoicingMessageSender(queueStatus, messagesForQueue, signer, postSender, eventService, objectMapper);
}
}

View File

@ -1,18 +0,0 @@
package com.rbkmoney.hooker.scheduler.invoicing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.EventService;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import java.util.List;
public class InvoicingMessageSender extends MessageSender<InvoicingMessage> {
public InvoicingMessageSender(MessageSender.QueueStatus queueStatus, List<InvoicingMessage> messages, Signer signer,
PostSender postSender, EventService eventService, ObjectMapper objectMapper) {
super(queueStatus, messages, signer, postSender, eventService, objectMapper);
}
}

View File

@ -0,0 +1,89 @@
package com.rbkmoney.hooker.service;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.QueueDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.exception.DaoException;
import com.rbkmoney.hooker.model.*;
import com.rbkmoney.hooker.retry.RetryPoliciesService;
import com.rbkmoney.hooker.retry.RetryPolicy;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import com.rbkmoney.hooker.scheduler.MessageSender;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class MessageProcessor<M extends Message, Q extends Queue> implements Runnable {
private final TaskDao taskDao;
private final QueueDao<Q> queueDao;
private final MessageDao<M> messageDao;
private final RetryPoliciesService retryPoliciesService;
private final TransactionTemplate transactionTemplate;
private final MessageSender<M, Q> messageSender;
@Override
public void run() {
transactionTemplate.execute(k -> {
process();
return null;
});
}
private void process() {
Map<Long, List<Task>> scheduledTasks = taskDao.getScheduled();
log.debug("scheduledTasks {}", scheduledTasks);
if (scheduledTasks.entrySet().isEmpty()) {
return;
}
Set<Long> queueIds = scheduledTasks.keySet();
Map<Long, Q> queuesMap = queueDao.getWithPolicies(queueIds).stream().collect(Collectors.toMap(Queue::getId, q -> q));
Set<Long> messageIds = scheduledTasks.values().stream().flatMap(Collection::stream).map(Task::getMessageId).collect(Collectors.toSet());
Map<Long, M> messagesMap = messageDao.getBy(messageIds).stream().collect(Collectors.toMap(Message::getId, m -> m));
List<QueueStatus> queueStatuses = messageSender.send(scheduledTasks, queuesMap, messagesMap);
queueStatuses.forEach(queueStatus -> {
try {
Queue queue = queueStatus.getQueue();
queueStatus.getMessagesDone().forEach(id -> taskDao.remove(queue.getId(), id));
if (queueStatus.isSuccess()) {
done(queue);
} else {
fail(queue);
}
} catch (DaoException e) {
log.error("DaoException error when remove sent messages. It's not a big deal, but some messages may be re-sent: {}",
queueStatus.getMessagesDone(), e);
}
});
}
private void done(Queue queue) {
if (queue.getRetryPolicyRecord().isFailed()) {
RetryPolicyRecord record = queue.getRetryPolicyRecord();
record.reset();
retryPoliciesService.update(record);
}
}
private void fail(Queue queue) {
log.warn("Queue {} failed.", queue.getId());
RetryPolicy retryPolicy = retryPoliciesService.getRetryPolicyByType(queue.getHook().getRetryPolicyType());
RetryPolicyRecord retryPolicyRecord = queue.getRetryPolicyRecord();
retryPolicy.updateFailed(retryPolicyRecord);
retryPoliciesService.update(retryPolicyRecord);
if (retryPolicy.shouldDisable(retryPolicyRecord)) {
queueDao.disable(queue.getId());
taskDao.removeAll(queue.getId());
log.warn("Queue {} was disabled according to retry policy.", queue.getId());
}
}
}

View File

@ -16,8 +16,7 @@ public class PostSender {
public static final String SIGNATURE_HEADER = "Content-Signature";
public static final long RESPONSE_MAX_LENGTH = 4096L;
public PostSender(@Value("${message.scheduler.threadPoolSize}") int connectionPoolSize,
@Value("${merchant.callback.timeout}") int timeout) {
public PostSender(int connectionPoolSize, int timeout) {
OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
if (log.isDebugEnabled()) {

View File

@ -10,7 +10,7 @@ spring:
task:
scheduling:
pool:
size: 3
size: 20
kafka:
bootstrap-servers: kenny-kafka1.bst1.rbkmoney.net:9092
@ -71,7 +71,7 @@ spring.datasource:
username: @db.user@
password: @db.password@
hikari:
maximum-pool-size: 10
maximum-pool-size: 20
idle-timeout: 30000
data-source-properties:
reWriteBatchedInserts: true
@ -83,8 +83,11 @@ db:
message:
scheduler:
delay: 1000
threadPoolSize: 10
limit: 50
invoicing:
threadPoolSize: 10
customer:
threadPoolSize: 3
clean:
scheduler:

View File

@ -8,7 +8,6 @@ import com.rbkmoney.hooker.utils.BuildUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@ -20,8 +19,6 @@ import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class HookDeleteDaoTest extends AbstractIntegrationTest {
@Value("${message.scheduler.limit}")
private int limit;
@Autowired
InvoicingTaskDao taskDao;
@ -49,11 +46,11 @@ public class HookDeleteDaoTest extends AbstractIntegrationTest {
Long hookId = hookDao.create(HookDaoImplTest.buildHook("partyId", "fake.url")).getId();
Long hookId2 = hookDao.create(HookDaoImplTest.buildHook("partyId2", "fake2.url")).getId();
batchService.process(Collections.singletonList(BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(),"2345", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.FULFILLED, PaymentStatusEnum.CAPTURED)));
assertEquals(queueDao.getWithPolicies(taskDao.getScheduled(limit).keySet()).size(), 1);
assertEquals(queueDao.getWithPolicies(taskDao.getScheduled().keySet()).size(), 1);
hookDao.delete(hookId2);
assertNotEquals(queueDao.getWithPolicies(taskDao.getScheduled(limit).keySet()).size(), 0);
assertNotEquals(queueDao.getWithPolicies(taskDao.getScheduled().keySet()).size(), 0);
hookDao.delete(hookId);
assertTrue(taskDao.getScheduled(limit).keySet().isEmpty());
assertTrue(taskDao.getScheduled().keySet().isEmpty());
assertFalse(hookDao.getHookById(hookId).isEnabled());
assertFalse(hookDao.getHookById(hookId2).isEnabled());
}
@ -64,9 +61,9 @@ public class HookDeleteDaoTest extends AbstractIntegrationTest {
Long messageId = customerDaoImpl.create(buildCustomerMessage(1L, "partyId", EventType.CUSTOMER_CREATED, CustomerMessageEnum.CUSTOMER, "124", "4356"));
customerQueueDao.createWithPolicy(messageId);
customerTaskDao.create(messageId);
assertEquals(customerQueueDao.getWithPolicies(customerTaskDao.getScheduled(limit).keySet()).size(), 1);
assertEquals(customerQueueDao.getWithPolicies(customerTaskDao.getScheduled().keySet()).size(), 1);
hookDao.delete(hookId);
assertTrue(customerTaskDao.getScheduled(limit).keySet().isEmpty());
assertTrue(customerTaskDao.getScheduled().keySet().isEmpty());
assertFalse(hookDao.getHookById(hookId).isEnabled());
}
}

View File

@ -16,6 +16,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.support.TransactionTemplate;
@ -29,10 +30,9 @@ import static org.junit.Assert.*;
* Created by jeckep on 17.04.17.
*/
@TestPropertySource(properties = "message.scheduler.invoicing.threadPoolSize=0")
public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
private int limit = 10;
@Autowired
InvoicingTaskDao taskDao;
@ -57,8 +57,6 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
@Before
public void setUp() throws Exception {
hookId = hookDao.create(HookDaoImplTest.buildHook("partyId", "fake.url")).getId();
messageDao.saveBatch(Collections.singletonList(BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(),"2345", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED)));
messageId = messageDao.getInvoicingMessage(InvoicingMessageKey.builder().invoiceId("2345").type(InvoicingMessageEnum.INVOICE).build()).getId();
}
@After
@ -68,16 +66,20 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
@Test
public void createDeleteGet() {
messageDao.saveBatch(Collections.singletonList(BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(),"2345", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED)));
messageId = messageDao.getInvoicingMessage(InvoicingMessageKey.builder().invoiceId("2345").type(InvoicingMessageEnum.INVOICE).build()).getId();
queueDao.saveBatchWithPolicies(Collections.singletonList(messageId));
taskDao.save(Collections.singletonList(messageId));
Map<Long, List<Task>> scheduled = taskDao.getScheduled(limit);
Map<Long, List<Task>> scheduled = taskDao.getScheduled();
assertEquals(1, scheduled.size());
taskDao.remove(scheduled.keySet().iterator().next(), messageId);
assertEquals(0, taskDao.getScheduled(limit).size());
assertEquals(0, taskDao.getScheduled().size());
}
@Test
public void testSaveWithHookIdAndInvoiceId(){
messageDao.saveBatch(Collections.singletonList(BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(),"2345", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED)));
messageId = messageDao.getInvoicingMessage(InvoicingMessageKey.builder().invoiceId("2345").type(InvoicingMessageEnum.INVOICE).build()).getId();
queueDao.saveBatchWithPolicies(Collections.singletonList(messageId));
int count = taskDao.save(hookId, "2345");
assertEquals(1, count);
@ -86,17 +88,27 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
@Test
public void testSelectForUpdate() {
List<InvoicingMessage> messages = IntStream.range(0, 20).mapToObj(i -> BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(), "" + i, "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED))
int cnt = 20;
List<InvoicingMessage> messagesOne = IntStream.range(0, cnt).mapToObj(i -> BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(), "invoice_id1", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED))
.collect(Collectors.toList());
messageDao.saveBatch(messages);
List<Long> messageIds = messages.stream().map(Message::getId).collect(Collectors.toList());
queueDao.saveBatchWithPolicies(messageIds);
taskDao.save(messageIds);
List<InvoicingMessage> messagesSecond = IntStream.range(0, cnt).mapToObj(i -> BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(), "invoice_id2", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED))
.collect(Collectors.toList());
messageDao.saveBatch(messagesOne);
List<Long> messageIdsOne = messagesOne.stream().map(Message::getId).collect(Collectors.toList());
queueDao.saveBatchWithPolicies(messageIdsOne);
taskDao.save(messageIdsOne);
messageDao.saveBatch(messagesSecond);
List<Long> messageIdsSecond = messagesSecond.stream().map(Message::getId).collect(Collectors.toList());
queueDao.saveBatchWithPolicies(messageIdsSecond);
taskDao.save(messageIdsSecond);
Set<Long> scheduledOne = new HashSet<>();
new Thread(() -> transactionTemplate.execute(tr -> {
scheduledOne.addAll(taskDao.getScheduled(limit).values().stream().flatMap(List::stream).map(Task::getMessageId).collect(Collectors.toSet()));
scheduledOne.addAll(taskDao.getScheduled().values().stream().flatMap(List::stream).map(Task::getMessageId).collect(Collectors.toSet()));
System.out.println("scheduledOne: " + scheduledOne);
try {
Thread.sleep(500);
@ -112,11 +124,11 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
e.printStackTrace();
}
assertEquals(limit, scheduledOne.size());
assertEquals(cnt, scheduledOne.size());
Set<Long> scheduledTwo = new HashSet<>();
new Thread(() -> transactionTemplate.execute(tr -> {
scheduledTwo.addAll(taskDao.getScheduled(limit).values().stream().flatMap(List::stream).map(Task::getMessageId).collect(Collectors.toSet()));
scheduledTwo.addAll(taskDao.getScheduled().values().stream().flatMap(List::stream).map(Task::getMessageId).collect(Collectors.toSet()));
System.out.println("scheduledTwo :" + scheduledTwo);
return 1;
})).start();
@ -127,7 +139,7 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
e.printStackTrace();
}
assertEquals(limit, scheduledTwo.size());
assertEquals(cnt, scheduledTwo.size());
scheduledOne.retainAll(scheduledTwo);
assertTrue(scheduledOne.isEmpty());
@ -138,9 +150,6 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
hookDao.create(HookDaoImplTest.buildHook("partyId", "fake2.url"));
int customLimit = 1;
InvoicingMessage message = BuildUtils.buildMessage(InvoicingMessageEnum.INVOICE.getValue(), "1", "partyId", EventType.INVOICE_CREATED, InvoiceStatusEnum.PAID, PaymentStatusEnum.CAPTURED);
messageDao.saveBatch(List.of(message));
@ -149,7 +158,7 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
Set<String> scheduledOne = new HashSet<>();
new Thread(() -> transactionTemplate.execute(tr -> {
scheduledOne.addAll(taskDao.getScheduled(customLimit).values().stream().flatMap(List::stream).map(t -> t.getMessageId() + " " + t.getQueueId()).collect(Collectors.toSet()));
scheduledOne.addAll(taskDao.getScheduled().values().stream().flatMap(List::stream).map(t -> t.getMessageId() + " " + t.getQueueId()).collect(Collectors.toSet()));
System.out.println("scheduledOne: " + scheduledOne);
try {
Thread.sleep(500);
@ -165,11 +174,11 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
e.printStackTrace();
}
assertEquals(customLimit, scheduledOne.size());
assertEquals(2, scheduledOne.size());
Set<String> scheduledTwo = new HashSet<>();
new Thread(() -> transactionTemplate.execute(tr -> {
scheduledTwo.addAll(taskDao.getScheduled(customLimit).values().stream().flatMap(List::stream).map(t -> t.getMessageId() + " " + t.getQueueId()).collect(Collectors.toSet()));
scheduledTwo.addAll(taskDao.getScheduled().values().stream().flatMap(List::stream).map(t -> t.getMessageId() + " " + t.getQueueId()).collect(Collectors.toSet()));
System.out.println("scheduledTwo :" + scheduledTwo);
return 1;
})).start();

View File

@ -62,10 +62,10 @@ public class CustomerTaskDaoTest extends AbstractIntegrationTest {
public void createDeleteGet() {
queueDao.createWithPolicy(messageId);
taskDao.create(messageId);
Map<Long, List<Task>> scheduled = taskDao.getScheduled(10);
Map<Long, List<Task>> scheduled = taskDao.getScheduled();
assertEquals(1, scheduled.size());
taskDao.remove(scheduled.keySet().iterator().next(), messageId);
assertEquals(0, taskDao.getScheduled(10).size());
assertEquals(0, taskDao.getScheduled().size());
}
@Test

View File

@ -26,9 +26,6 @@ import static org.junit.Assert.*;
public class BatchProcessingTest extends AbstractIntegrationTest {
@Value("${message.scheduler.limit}")
private int limit;
@Autowired
private HandlerManager handlerManager;
@ -126,7 +123,7 @@ public class BatchProcessingTest extends AbstractIntegrationTest {
assertNotEquals(messageDao.getBy(Collections.singletonList(messageId - 1)).get(0).getPaymentStatus(),
messageDao.getBy(Collections.singletonList(messageId)).get(0).getPaymentStatus());
assertEquals(1, taskDao.getScheduled(limit).size());
assertEquals(1, taskDao.getScheduled().size());
assertEquals(1, invoicingQueueDao.getWithPolicies(Collections.singletonList(1L)).size());
//test duplication