HOOK-57: New resend policy (#38)

* HOOK-57: init commit

* Refactoring

* Added new policy for customers

* Fixed test

* Refactoring (remove duplicated code)

* Added cache supporting for queues

* Renamed class, optimized imports

* Remove unused row mapper

* Fixed cache property

* Revert dumb renaming

* Added new columns into tables ('enabled', 'topic')

* Last refactoring

* Refactoring of executorService

* Removed unused sync block

* Fixed indices
This commit is contained in:
Inal Arsanukaev 2017-11-27 16:14:45 +03:00 committed by GitHub
parent 53bc54b0e0
commit 3c08cc0c47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 1423 additions and 1112 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hooker</artifactId>
<version>1.2.1-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hooker</name>

View File

@ -1,10 +0,0 @@
package com.rbkmoney.hooker.configuration;
/**
* Created by jeckep on 25.04.17.
*/
public class CacheConfiguration {
public static final String MESSAGES_BY_INVOICE = "messagesByInvoice";
public static final String MESSAGES_BY_IDS = "messagesById";
public static final String HOOKS = "hooks";
}

View File

@ -2,7 +2,7 @@ package com.rbkmoney.hooker.configuration;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.dao.impl.*;
import org.jooq.Schema;
@ -25,8 +25,8 @@ public class DaoConfiguration {
@Bean
@DependsOn("dbInitializer")
public MessageDao messageDao(DataSource dataSource) {
return new MessageDaoImpl(dataSource);
public InvoicingMessageDao messageDao(DataSource dataSource) {
return new InvoicingMessageDaoImpl(dataSource);
}
@Bean
@ -53,6 +53,18 @@ public class DaoConfiguration {
return new SimpleRetryPolicyDaoImpl(dataSource);
}
@Bean
@DependsOn("dbInitializer")
public InvoicingQueueDao invoicingQueueDao(DataSource dataSource) {
return new InvoicingQueueDao(dataSource);
}
@Bean
@DependsOn("dbInitializer")
public CustomerQueueDao customerQueueDao(DataSource dataSource) {
return new CustomerQueueDao(dataSource);
}
@Bean
public Schema dbSchema() {
return new SchemaImpl("hook");

View File

@ -25,88 +25,73 @@ public abstract class AbstractTaskDao extends NamedParameterJdbcDaoSupport imple
}
public static RowMapper<Task> taskRowMapper = (rs, i) ->
new Task(rs.getLong("hook_id"), rs.getLong("message_id"));
new Task(rs.getLong("message_id"), rs.getLong("queue_id"));
protected abstract String getMessageTopic();
@Override
public void remove(long hookId, long messageId) {
final String sql = "DELETE FROM hook.scheduled_task where hook_id=:hook_id and message_id=:message_id and message_type=CAST(:message_type as hook.message_topic)";
public void remove(long queueId, long messageId) throws DaoException {
final String sql = "DELETE FROM hook.scheduled_task where queue_id=:queue_id and message_id=:message_id and message_type=CAST(:message_type as hook.message_topic)";
try {
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("hook_id", hookId)
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("queue_id", queueId)
.addValue("message_id", messageId)
.addValue("message_type", getMessageTopic()));
log.debug("Task with hook_id = " + hookId + " messageId = " + messageId + " removed from hook.scheduled_task");
log.debug("Task with queueId {} messageId {} removed from hook.scheduled_task", queueId, messageId);
} catch (NestedRuntimeException e) {
log.error("Fail to delete task by hook_id and message_id", e);
log.warn("Fail to delete task by queue_id {} and message_id {}", queueId, messageId, e);
throw new DaoException(e);
}
}
@Override
public void removeAll(long hookId) {
final String sql = "DELETE FROM hook.scheduled_task where hook_id=:hook_id and message_type=CAST(:message_type as hook.message_topic)";
public void removeAll(long queueId) throws DaoException {
final String sql = "DELETE FROM hook.scheduled_task where queue_id=:queue_id and message_type=CAST(:message_type as hook.message_topic)";
try {
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("hook_id", hookId).addValue("message_type", getMessageTopic()));
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("queue_id", queueId).addValue("message_type", getMessageTopic()));
} catch (NestedRuntimeException e) {
log.error("Fail to delete tasks for hook:" + hookId, e);
log.warn("Fail to delete tasks for hook:" + queueId, e);
throw new DaoException(e);
}
}
@Override
public List<Task> getAll() {
final String sql = "SELECT * FROM hook.scheduled_task WHERE message_type=CAST(:message_type as hook.message_topic)";
try {
List<Task> tasks = getNamedParameterJdbcTemplate().query(sql, new MapSqlParameterSource("message_type", getMessageTopic()), taskRowMapper);
log.debug("Tasks count: " + tasks.size());
return tasks;
} catch (NestedRuntimeException e) {
log.error("Fail to get all tasks from scheduled_task", e);
throw new DaoException(e);
}
}
@Override
// should return ordered BY hook_id, message_id
public Map<Long, List<Task>> getScheduled(Collection<Long> excludeHooksIds) {
// TODO think about limit
public Map<Long, List<Task>> getScheduled(Collection<Long> excludeQueueIds) throws DaoException {
final String sql =
" SELECT DISTINCT * " +
" FROM hook.scheduled_task st WHERE message_type=CAST(:message_type as hook.message_topic)" +
(excludeHooksIds.size() > 0 ? " AND st.hook_id not in (:hook_ids)" : "") +
" ORDER BY hook_id ASC , message_id ASC";
" SELECT st.message_id, st.queue_id FROM hook.scheduled_task st WHERE message_type=CAST(:message_type as hook.message_topic)" +
(excludeQueueIds.size() > 0 ? " AND st.queue_id not in (:queue_ids)" : "") +
" ORDER BY queue_id ASC, message_id ASC LIMIT 10000";
try {
List<Task> tasks = getNamedParameterJdbcTemplate().query(
sql,
new MapSqlParameterSource("enabled", true)
.addValue("hook_ids", excludeHooksIds)
sql, new MapSqlParameterSource()
.addValue("queue_ids", excludeQueueIds)
.addValue("message_type", getMessageTopic())
, taskRowMapper);
Map<Long, List<Task>> longListMap = splitByHooks(tasks);
Map<Long, List<Task>> longListMap = splitByQueue(tasks);
return longListMap;
} catch (NestedRuntimeException e) {
log.error("Fail to get active tasks from scheduled_task", e);
log.warn("Fail to get active tasks from scheduled_task", e);
throw new DaoException(e);
}
}
//should preserve order
private Map<Long, List<Task>> splitByHooks(List<Task> orderedByHookIdMessageIdTasks) {
private Map<Long, List<Task>> splitByQueue(List<Task> orderedByQueueIdMessageIdTasks) {
final Map<Long, List<Task>> map = new HashMap<>();
if (orderedByHookIdMessageIdTasks.size() == 0) {
if (orderedByQueueIdMessageIdTasks.size() == 0) {
return map;
}
int start = 0;
long previousHookId = orderedByHookIdMessageIdTasks.get(0).getHookId();
for (int i = 0; i < orderedByHookIdMessageIdTasks.size(); i++) {
long currentHookId = orderedByHookIdMessageIdTasks.get(i).getHookId();
if (previousHookId != currentHookId) {
map.put(previousHookId, orderedByHookIdMessageIdTasks.subList(start, i));
long previousQueueId = orderedByQueueIdMessageIdTasks.get(0).getQueueId();
for (int i = 0; i < orderedByQueueIdMessageIdTasks.size(); i++) {
long currentQueueId = orderedByQueueIdMessageIdTasks.get(i).getQueueId();
if (previousQueueId != currentQueueId) {
map.put(previousQueueId, orderedByQueueIdMessageIdTasks.subList(start, i));
start = i;
previousHookId = currentHookId;
previousQueueId = currentQueueId;
}
}
map.put(previousHookId, orderedByHookIdMessageIdTasks.subList(start, orderedByHookIdMessageIdTasks.size()));
map.put(previousQueueId, orderedByQueueIdMessageIdTasks.subList(start, orderedByQueueIdMessageIdTasks.size()));
return map;
}

View File

@ -0,0 +1,53 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Created by inalarsanukaev on 23.11.17.
*/
@Component
public class CacheMng {
private static final String MESSAGES_BY_INVOICE = "messagesByInvoice";
private static final String MESSAGES_BY_IDS = "messagesById";
private static final String QUEUES = "queues";
@Autowired
private CacheManager cacheMng;
public void putMessage(Message message){
cacheMng.getCache(MESSAGES_BY_IDS).put(message.getId(), message);
}
public void putMessage(String id, Message message){
cacheMng.getCache(MESSAGES_BY_INVOICE).put(id, message);
}
public <T extends Message> T getMessage(String id, Class<T> type) {
return cacheMng.getCache(MESSAGES_BY_INVOICE).get(id, type);
}
public <T extends Message> List<T> getMessages(Collection<Long> ids, Class<T> type) {
Cache cache = cacheMng.getCache(MESSAGES_BY_IDS);
return ids.stream().map(id -> cache.get(id, type)).filter(Objects::nonNull).collect(Collectors.toList());
}
public <T extends Queue> List<T> getQueues(Collection<Long> ids, Class<T> type){
Cache cache = cacheMng.getCache(QUEUES);
return ids.stream().map(id -> cache.get(id, type)).filter(Objects::nonNull).collect(Collectors.toList());
}
public void putQueues(Collection<? extends Queue> queues){
Cache cache = cacheMng.getCache(QUEUES);
queues.forEach(q -> cache.put(q.getId(), q));
}
}

View File

@ -2,12 +2,6 @@ package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.model.CustomerMessage;
import java.util.Collection;
import java.util.List;
public interface CustomerDao {
public interface CustomerDao extends MessageDao<CustomerMessage>{
CustomerMessage getAny(String customerId, String type) throws DaoException;
CustomerMessage create(CustomerMessage customerMessage) throws DaoException;
Long getMaxEventId();
List<CustomerMessage> getBy(Collection<Long> messageIds);
}

View File

@ -2,17 +2,14 @@ package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.model.Hook;
import java.util.Collection;
import java.util.List;
/**
* Created by inal on 28.11.2016.
*/
public interface HookDao {
List<Hook> getPartyHooks(String partyId);
Hook getHookById(long id);
Hook create(Hook hook);
void delete(long id);
void disable(long id);
List<Hook> getWithPolicies(Collection<Long> ids);
List<Hook> getPartyHooks(String partyId) throws DaoException;
Hook getHookById(long id) throws DaoException;
Hook create(Hook hook) throws DaoException;
void delete(long id) throws DaoException;
}

View File

@ -0,0 +1,7 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.model.InvoicingMessage;
public interface InvoicingMessageDao extends MessageDao<InvoicingMessage> {
InvoicingMessage getAny(String invoiceId, String paymentType) throws DaoException;
}

View File

@ -1,13 +1,10 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.model.Message;
import java.util.Collection;
import java.util.List;
public interface MessageDao {
Message getAny(String invoiceId, String type) throws DaoException;
Message create(Message message) throws DaoException;
public interface MessageDao<M> {
void create(M message) throws DaoException;
Long getMaxEventId();
List<Message> getBy(Collection<Long> messageIds);
List<M> getBy(Collection<Long> messageIds) throws DaoException;
}

View File

@ -0,0 +1,15 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.model.Queue;
import java.util.Collection;
import java.util.List;
/**
* Created by inalarsanukaev on 14.11.17.
*/
public interface QueueDao<Q extends Queue> {
void createWithPolicy(long messageId) throws DaoException;
List<Q> getWithPolicies(Collection<Long> ids);
void disable(long id);
}

View File

@ -6,5 +6,5 @@ import com.rbkmoney.hooker.retry.impl.simple.SimpleRetryPolicyRecord;
* Created by jeckep on 17.04.17.
*/
public interface SimpleRetryPolicyDao {
void update(SimpleRetryPolicyRecord record);
void update(SimpleRetryPolicyRecord record) throws DaoException;
}

View File

@ -11,9 +11,8 @@ import java.util.Map;
* Created by jeckep on 13.04.17.
*/
public interface TaskDao {
void create(long messageId);
void remove(long hookId, long messageId);
void removeAll(long hookId);
List<Task> getAll();
Map<Long, List<Task>> getScheduled(Collection<Long> excludeHooksIds);
void create(long messageId) throws DaoException;
void remove(long queueId, long messageId);
void removeAll(long queueId) throws DaoException;
Map<Long, List<Task>> getScheduled(Collection<Long> excludeQueueIds) throws DaoException;
}

View File

@ -8,14 +8,16 @@ import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
public class AllHookTablesRow {
private long id;
private String partyId;
private String topic;
private String url;
private String pubKey;
private boolean enabled;
private WebhookAdditionalFilter webhookAdditionalFilter;
public AllHookTablesRow(long id, String partyId, String url, String pubKey, boolean enabled, WebhookAdditionalFilter webhookAdditionalFilter) {
public AllHookTablesRow(long id, String partyId, String topic, String url, String pubKey, boolean enabled, WebhookAdditionalFilter webhookAdditionalFilter) {
this.id = id;
this.partyId = partyId;
this.topic = topic;
this.url = url;
this.pubKey = pubKey;
this.enabled = enabled;
@ -46,6 +48,14 @@ public class AllHookTablesRow {
this.partyId = partyId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getUrl() {
return url;
}
@ -70,4 +80,4 @@ public class AllHookTablesRow {
this.enabled = enabled;
}
}
}

View File

@ -16,6 +16,7 @@ import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcDaoSupport;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.transaction.annotation.Transactional;
import javax.sql.DataSource;
import java.util.ArrayList;
@ -30,11 +31,14 @@ import static com.rbkmoney.hooker.utils.PaymentToolUtils.getPaymentToolDetails;
*/
public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements CustomerDao {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
CustomerQueueDao queueDao;
@Autowired
CustomerTaskDao taskDao;
Logger log = LoggerFactory.getLogger(this.getClass());
public static final String ID = "id";
public static final String EVENT_ID = "event_id";
public static final String TYPE = "type";
@ -131,7 +135,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
}
@Override
public CustomerMessage create(CustomerMessage message) throws DaoException {
@Transactional
public void create(CustomerMessage message) throws DaoException {
final String sql = "INSERT INTO hook.customer_message " +
"(event_id, occured_at, type, party_id, event_type, " +
"customer_id, customer_shop_id, customer_status, customer_email , customer_phone, customer_metadata, " +
@ -181,8 +186,8 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
getNamedParameterJdbcTemplate().update(sql, params, keyHolder);
message.setId(keyHolder.getKey().longValue());
log.info("CustomerMessage {} saved to db.", message);
queueDao.createWithPolicy(message.getId());
taskDao.create(message.getId());
return message;
} catch (NestedRuntimeException e) {
throw new DaoException("Couldn't create customerMessage with customerId " + customer.getId(), e);
}
@ -199,11 +204,11 @@ public class CustomerDaoImpl extends NamedParameterJdbcDaoSupport implements Cus
}
@Override
public List<CustomerMessage> getBy(Collection<Long> messageIds) {
public List<CustomerMessage> getBy(Collection<Long> messageIds) throws DaoException {
if (messageIds.isEmpty()) {
return new ArrayList<>();
}
final String sql = "SELECT DISTINCT * FROM hook.customer_message WHERE id in (:ids)";
final String sql = "SELECT * FROM hook.customer_message WHERE id in (:ids)";
try {
List<CustomerMessage> messagesFromDb = getNamedParameterJdbcTemplate().query(sql, new MapSqlParameterSource("ids", messageIds), messageRowMapper);
log.debug("messagesFromDb {}", messagesFromDb);

View File

@ -0,0 +1,103 @@
package com.rbkmoney.hooker.dao.impl;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.QueueDao;
import com.rbkmoney.hooker.model.CustomerQueue;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.retry.RetryPolicyType;
import com.rbkmoney.swag_webhook_events.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.NestedRuntimeException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcDaoSupport;
import javax.sql.DataSource;
import java.util.Collection;
import java.util.List;
/**
* Created by inalarsanukaev on 14.11.17.
*/
public class CustomerQueueDao extends NamedParameterJdbcDaoSupport implements QueueDao<CustomerQueue> {
Logger log = LoggerFactory.getLogger(this.getClass());
public CustomerQueueDao(DataSource dataSource) {
setDataSource(dataSource);
}
public static RowMapper<CustomerQueue> queueWithPolicyRowMapper = (rs, i) -> {
CustomerQueue queue = new CustomerQueue();
queue.setId(rs.getLong("id"));
queue.setCustomerId(rs.getString("customer_id"));
Hook hook = new Hook();
hook.setId(rs.getLong("hook_id"));
hook.setPartyId(rs.getString("party_id"));
hook.setTopic(rs.getString("message_type"));
hook.setUrl(rs.getString("url"));
hook.setPubKey(rs.getString("pub_key"));
hook.setPrivKey(rs.getString("priv_key"));
hook.setEnabled(rs.getBoolean("enabled"));
RetryPolicyType retryPolicyType = RetryPolicyType.valueOf(rs.getString("retry_policy"));
hook.setRetryPolicyType(retryPolicyType);
queue.setHook(hook);
queue.setRetryPolicyRecord(retryPolicyType.build(rs));
return queue;
};
@Override
public void createWithPolicy(long messageId) throws DaoException {
final String sql = "with queue as ( " +
" insert into hook.customer_queue(hook_id, customer_id)" +
" select w.id, m.customer_id" +
" from hook.customer_message m" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled and w.topic=CAST(:message_type as hook.message_topic)" +
" where m.id = :id " +
" on conflict(hook_id, customer_id) do nothing returning *) " +
"insert into hook.simple_retry_policy(queue_id, message_type) select id, CAST(:message_type as hook.message_topic) from queue";
try {
int count = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("id", messageId)
.addValue("message_type", getMessagesTopic()));
log.info("Created {} queues for messageId {}", count, messageId);
} catch (NestedRuntimeException e) {
log.error("Fail to createWithPolicy queue {}", messageId, e);
throw new DaoException(e);
}
}
@Override
public List<CustomerQueue> getWithPolicies(Collection<Long> ids) throws DaoException {
final String sql =
" select q.id, q.hook_id, q.customer_id, wh.party_id, wh.url, k.pub_key, k.priv_key, wh.enabled, wh.retry_policy, srp.fail_count, srp.last_fail_time, srp.message_type " +
" from hook.customer_queue q " +
" join hook.webhook wh on wh.id = q.hook_id and wh.enabled and wh.topic=CAST(:message_type as hook.message_topic)" +
" join hook.party_key k on k.party_id = wh.party_id " +
" left join hook.simple_retry_policy srp on q.id = srp.queue_id and srp.message_type=CAST(:message_type as hook.message_topic)" +
" where q.id in (:ids) and q.enabled";
final MapSqlParameterSource params = new MapSqlParameterSource("ids", ids)
.addValue("message_type", getMessagesTopic());
try {
return getNamedParameterJdbcTemplate().query(sql, params, queueWithPolicyRowMapper);
} catch (NestedRuntimeException e) {
throw new DaoException(e);
}
}
@Override
public void disable(long id) throws DaoException {
final String sql = " UPDATE hook.customer_queue SET enabled = FALSE where id=:id;";
try {
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("id", id));
} catch (NestedRuntimeException e) {
log.error("Fail to disable queue: {}", id, e);
throw new DaoException(e);
}
}
public String getMessagesTopic() {
return Event.TopicEnum.CUSTOMERSTOPIC.getValue();
}
}

View File

@ -27,22 +27,24 @@ public class CustomerTaskDao extends AbstractTaskDao {
}
@Override
public void create(long messageId) {
public void create(long messageId) throws DaoException {
final String sql =
" insert into hook.scheduled_task(message_id, hook_id, message_type)" +
" select m.id, w.id, '" + getMessageTopic() + "'" +
" insert into hook.scheduled_task(message_id, queue_id, message_type)" +
" select m.id, q.id, w.topic" +
" from hook.customer_message m" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled and w.topic=CAST(:message_type as hook.message_topic)" +
" join hook.webhook_to_events wte on wte.hook_id = w.id" +
" where m.id = :id " +
" join hook.customer_queue q on q.hook_id=w.id and q.enabled and q.customer_id=m.customer_id" +
" where m.id = :message_id " +
" and m.event_type = wte.event_type " +
" and (m.customer_shop_id = wte.invoice_shop_id or wte.invoice_shop_id is null) " +
" ON CONFLICT (message_id, hook_id) DO NOTHING";
" ON CONFLICT (message_id, queue_id, message_type) DO NOTHING";
try {
int updateCount = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("id", messageId));
log.debug("Created tasks count : " + updateCount);
int updateCount = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("message_id", messageId)
.addValue("message_type", getMessageTopic()));
log.info("Created tasks count={} for messageId={}", updateCount, messageId);
} catch (NestedRuntimeException e) {
log.error("Fail to create tasks for messages messages.", e);
log.error("Fail to create tasks for messages.", e);
throw new DaoException(e);
}
}

View File

@ -1,19 +1,15 @@
package com.rbkmoney.hooker.dao.impl;
import com.rbkmoney.hooker.configuration.CacheConfiguration;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.retry.RetryPolicyType;
import com.rbkmoney.hooker.service.crypt.KeyPair;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.core.NestedRuntimeException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
@ -33,7 +29,7 @@ public class HookDaoImpl implements HookDao {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
CacheManager cacheManager;
Signer signer;
private final NamedParameterJdbcTemplate jdbcTemplate;
@ -41,22 +37,8 @@ public class HookDaoImpl implements HookDao {
this.jdbcTemplate = jdbcTemplate;
}
public static RowMapper<Hook> hookWithPolicyRowMapper = (rs, i) -> {
Hook hook = new Hook();
hook.setId(rs.getLong("id"));
hook.setPartyId(rs.getString("party_id"));
hook.setUrl(rs.getString("url"));
hook.setPubKey(rs.getString("pub_key"));
hook.setPrivKey(rs.getString("priv_key"));
hook.setEnabled(rs.getBoolean("enabled"));
RetryPolicyType retryPolicyType = RetryPolicyType.valueOf(rs.getString("retry_policy"));
hook.setRetryPolicyType(retryPolicyType);
hook.setRetryPolicyRecord(retryPolicyType.build(rs));
return hook;
};
@Override
public List<Hook> getPartyHooks(String partyId) {
public List<Hook> getPartyHooks(String partyId) throws DaoException {
log.debug("getPartyHooks request. PartyId = {}", partyId);
final String sql =
" select w.*, k.pub_key, wte.* " +
@ -78,7 +60,7 @@ public class HookDaoImpl implements HookDao {
return result;
} catch (NestedRuntimeException e) {
String message = "Couldn't getPartyHooks for partyId " + partyId;
log.error(message, e);
log.warn(message, e);
throw new DaoException(message);
}
}
@ -93,11 +75,7 @@ public class HookDaoImpl implements HookDao {
//grouping by hookId
for (AllHookTablesRow row : allHookTablesRows) {
final long hookId = row.getId();
List<AllHookTablesRow> list = hookIdToRows.get(hookId);
if (list == null) {
list = new ArrayList<>();
hookIdToRows.put(hookId, list);
}
List<AllHookTablesRow> list = hookIdToRows.computeIfAbsent(hookId, k -> new ArrayList<>());
list.add(row);
}
@ -106,6 +84,7 @@ public class HookDaoImpl implements HookDao {
Hook hook = new Hook();
hook.setId(hookId);
hook.setPartyId(rows.get(0).getPartyId());
hook.setTopic(rows.get(0).getTopic());
hook.setUrl(rows.get(0).getUrl());
hook.setPubKey(rows.get(0).getPubKey());
hook.setEnabled(rows.get(0).isEnabled());
@ -117,7 +96,7 @@ public class HookDaoImpl implements HookDao {
}
@Override
public Hook getHookById(long id) {
public Hook getHookById(long id) throws DaoException {
final String sql = "select w.*, k.pub_key, wte.* " +
"from hook.webhook w " +
"join hook.party_key k " +
@ -137,51 +116,25 @@ public class HookDaoImpl implements HookDao {
}
return result.get(0);
} catch (NestedRuntimeException e) {
log.error("Fail to get hook: " + id, e);
log.warn("Fail to get hook {}", id, e);
throw new DaoException(e);
}
}
public List<Hook> getWithPolicies(Collection<Long> ids) {
List<Hook> hooks = getFromCache(ids);
if (hooks.size() == ids.size()) {
return hooks;
}
Set<Long> hookIds = new HashSet<>(ids);
hooks.forEach(h -> hookIds.remove(h.getId()));
final String sql =
" select w.*, k.*, srp.*" +
" from hook.webhook w " +
" join hook.party_key k on k.party_id = w.party_id " +
" left join hook.simple_retry_policy srp on srp.hook_id = w.id" +
" where w.id in (:ids)";
final MapSqlParameterSource params = new MapSqlParameterSource("ids", hookIds);
try {
List<Hook> hooksFromDb = jdbcTemplate.query(sql, params, hookWithPolicyRowMapper);
putToCache(hooksFromDb);
hooks.addAll(hooksFromDb);
return hooks;
} catch (NestedRuntimeException e) {
throw new DaoException(e);
}
}
@Override
@Transactional
public Hook create(Hook hook) {
public Hook create(Hook hook) throws DaoException {
String pubKey = createOrGetPubKey(hook.getPartyId());
hook.setPubKey(pubKey);
hook.setEnabled(true);
final String sql = "INSERT INTO hook.webhook(party_id, url) " +
"VALUES (:party_id, :url) RETURNING ID";
final String sql = "INSERT INTO hook.webhook(party_id, url, topic) " +
"VALUES (:party_id, :url, CAST(:topic as hook.message_topic)) RETURNING ID";
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("party_id", hook.getPartyId())
.addValue("url", hook.getUrl());
.addValue("url", hook.getUrl())
.addValue("topic", hook.getTopic());
try {
GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
int updateCount = jdbcTemplate.update(sql, params, keyHolder);
@ -190,25 +143,14 @@ public class HookDaoImpl implements HookDao {
}
hook.setId(keyHolder.getKey().longValue());
saveHookFilters(hook.getId(), hook.getFilters());
addRecordToRetryPolicy(hook.getId());
} catch (NestedRuntimeException e) {
log.error("Fail to create hook: " + hook, e);
log.warn("Fail to createWithPolicy hook {}", hook, e);
throw new DaoException(e);
}
log.info("Webhook with id = {} created.", hook.getId());
return hook;
}
private void addRecordToRetryPolicy(long hookId) {
final String sql = "insert into hook.simple_retry_policy(hook_id) VALUES (:hook_id)";
try {
jdbcTemplate.update(sql, new MapSqlParameterSource("hook_id", hookId));
} catch (NestedRuntimeException e) {
log.error("Fail to create simple_retry_policy for hook: " + hookId, e);
throw new DaoException(e);
}
}
private void saveHookFilters(long hookId, Collection<WebhookAdditionalFilter> webhookAdditionalFilters) {
int size = webhookAdditionalFilters.size();
List<Map<String, Object>> batchValues = new ArrayList<>(size);
@ -237,10 +179,14 @@ public class HookDaoImpl implements HookDao {
@Override
@Transactional
public void delete(long id) {
public void delete(long id) throws DaoException {
final String sql =
" DELETE FROM hook.scheduled_task where hook_id=:id;" +
" DELETE FROM hook.simple_retry_policy where hook_id=:id;" +
" DELETE FROM hook.scheduled_task USING hook.invoicing_queue q WHERE q.hook_id=:id;" +
" DELETE FROM hook.scheduled_task USING hook.customer_queue q WHERE q.hook_id=:id;" +
" DELETE FROM hook.simple_retry_policy USING hook.invoicing_queue q WHERE q.hook_id=:id;" +
" DELETE FROM hook.simple_retry_policy USING hook.customer_queue q WHERE q.hook_id=:id;" +
" DELETE FROM hook.invoicing_queue where hook_id=:id;" +
" DELETE FROM hook.customer_queue where hook_id=:id;" +
" DELETE FROM hook.webhook_to_events where hook_id=:id;" +
" DELETE FROM hook.webhook where id=:id; ";
try {
@ -250,21 +196,7 @@ public class HookDaoImpl implements HookDao {
}
}
@Override
public void disable(long id) {
final String sql = " UPDATE hook.webhook SET enabled = FALSE where id=:id;";
try {
jdbcTemplate.update(sql, new MapSqlParameterSource("id", id));
} catch (NestedRuntimeException e) {
log.error("Fail to disable webhook: {}", id, e);
throw new DaoException(e);
}
}
@Autowired
Signer signer;
private String createOrGetPubKey(String partyId) {
private String createOrGetPubKey(String partyId) throws DaoException {
final String sql = "INSERT INTO hook.party_key(party_id, priv_key, pub_key) " +
"VALUES (:party_id, :priv_key, :pub_key) " +
"ON CONFLICT(party_id) DO UPDATE SET party_id=:party_id RETURNING pub_key";
@ -280,7 +212,7 @@ public class HookDaoImpl implements HookDao {
jdbcTemplate.update(sql, params, keyHolder);
pubKey = (String) keyHolder.getKeys().get("pub_key");
} catch (NestedRuntimeException | NullPointerException | ClassCastException e) {
log.error("Fail to create security keys for party: " + partyId, e);
log.warn("Fail to createOrGetPubKey security keys for party {} ", partyId, e);
throw new DaoException(e);
}
return pubKey;
@ -290,7 +222,7 @@ public class HookDaoImpl implements HookDao {
private static RowMapper<AllHookTablesRow> allHookTablesRowRowMapper =
(rs, i) -> new AllHookTablesRow(rs.getLong("id"),
rs.getString("party_id"),
//EventTypeCode.valueOfKey(rs.getString("event_code")),
rs.getString("topic"),
rs.getString("url"),
rs.getString("pub_key"),
rs.getBoolean("enabled"),
@ -300,25 +232,5 @@ public class HookDaoImpl implements HookDao {
rs.getString("invoice_payment_status")));
private List<Hook> getFromCache(Collection<Long> ids){
Cache cache = cacheManager.getCache(CacheConfiguration.HOOKS);
List<Hook> hooks = new ArrayList<>();
for(long id: ids){
Hook hook = cache.get(id, Hook.class);
if(hook != null){
hooks.add(hook);
}
}
return hooks;
}
private void putToCache(Collection<Hook> hooks){
Cache cache = cacheManager.getCache(CacheConfiguration.HOOKS);
for(Hook hook: hooks){
cache.put(hook.getId(), hook);
}
}
}
//select * from hook.webhook w where exists (select * from hook.webhook_to_events wh where wh.hook_id = w.id AND wh.event_type = 'CUSTOMER_CREATED');

View File

@ -1,8 +1,8 @@
package com.rbkmoney.hooker.dao.impl;
import com.rbkmoney.hooker.configuration.CacheConfiguration;
import com.rbkmoney.hooker.dao.CacheMng;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.model.*;
import com.rbkmoney.hooker.model.Invoice;
import com.rbkmoney.hooker.model.Payment;
@ -12,8 +12,6 @@ import com.rbkmoney.swag_webhook_events.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.core.NestedRuntimeException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.RowMapper;
@ -27,14 +25,17 @@ import java.util.*;
import static com.rbkmoney.hooker.utils.PaymentToolUtils.getPaymentToolDetails;
public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements MessageDao {
public class InvoicingMessageDaoImpl extends NamedParameterJdbcDaoSupport implements InvoicingMessageDao {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
InvoicingTaskDao taskDao;
CacheMng cacheMng;
@Autowired
CacheManager cacheManager;
InvoicingQueueDao queueDao;
@Autowired
InvoicingTaskDao taskDao;
public static final String ID = "id";
public static final String EVENT_ID = "event_id";
@ -110,8 +111,8 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
return invoiceCartPosition;
};
private static RowMapper<Message> messageRowMapper = (rs, i) -> {
Message message = new Message();
private static RowMapper<InvoicingMessage> messageRowMapper = (rs, i) -> {
InvoicingMessage message = new InvoicingMessage();
message.setId(rs.getLong(ID));
message.setEventId(rs.getLong(EVENT_ID));
message.setEventTime(rs.getString(EVENT_TIME));
@ -177,17 +178,17 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
return message;
};
public MessageDaoImpl(DataSource dataSource) {
public InvoicingMessageDaoImpl(DataSource dataSource) {
setDataSource(dataSource);
}
@Override
public Message getAny(String invoiceId, String type) throws DaoException {
Message message = getFromCache(invoiceId, type);
public InvoicingMessage getAny(String invoiceId, String type) throws DaoException {
InvoicingMessage message = cacheMng.getMessage(invoiceId + type, InvoicingMessage.class);
if (message != null) {
return message.copy();
}
Message result = null;
InvoicingMessage result = null;
final String sql = "SELECT * FROM hook.message WHERE invoice_id =:invoice_id AND type =:type ORDER BY id DESC LIMIT 1";
MapSqlParameterSource params = new MapSqlParameterSource(INVOICE_ID, invoiceId).addValue(TYPE, type);
try {
@ -199,9 +200,9 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
result.getInvoice().setCart(cart);
}
} catch (EmptyResultDataAccessException e) {
log.warn("Message with invoiceId {} not exist!", invoiceId);
log.warn("InvoicingMessage with invoiceId {} not exist!", invoiceId);
} catch (NestedRuntimeException e) {
throw new DaoException("MessageDaoImpl.getAny error with invoiceId " + invoiceId, e);
throw new DaoException("InvoicingMessageDaoImpl.getAny error with invoiceId " + invoiceId, e);
}
putToCache(result);
@ -238,7 +239,7 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
@Override
@Transactional
public Message create(Message message) throws DaoException {
public void create(InvoicingMessage message) throws DaoException {
final String sql = "INSERT INTO hook.message" +
"(event_id, event_time, type, party_id, event_type, " +
"invoice_id, shop_id, invoice_created_at, invoice_status, invoice_reason, invoice_due_date, invoice_amount, " +
@ -318,12 +319,10 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
getNamedParameterJdbcTemplate().update(sql, params, keyHolder);
message.setId(keyHolder.getKey().longValue());
saveCart(message.getId(), message.getInvoice().getCart());
log.info("Message {} save to db.", message);
// create tasks
taskDao.create(message.getId());
log.info("InvoicingMessage {} saved to db.", message);
putToCache(message);
return message;
queueDao.createWithPolicy(message.getId());
taskDao.create(message.getId());
} catch (NestedRuntimeException e) {
throw new DaoException("Couldn't create message with invoice_id "+ message.getInvoice().getId(), e);
}
@ -340,53 +339,36 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
}
@Override
public List<Message> getBy(Collection<Long> messageIds) {
List<Message> messages = getFromCache(messageIds);
public List<InvoicingMessage> getBy(Collection<Long> messageIds) throws DaoException {
List<InvoicingMessage> messages = cacheMng.getMessages(messageIds, InvoicingMessage.class);
if (messages.size() == messageIds.size()) {
return messages;
}
Set<Long> ids = new HashSet<>(messageIds);
for (Message message : messages) {
for (InvoicingMessage message : messages) {
ids.remove(message.getId());
}
final String sql = "SELECT DISTINCT * FROM hook.message WHERE id in (:ids)";
final String sql = "SELECT * FROM hook.message WHERE id in (:ids)";
try {
List<Message> messagesFromDb = getNamedParameterJdbcTemplate().query(sql, new MapSqlParameterSource("ids", ids), messageRowMapper);
List<InvoicingMessage> messagesFromDb = getNamedParameterJdbcTemplate().query(sql, new MapSqlParameterSource("ids", ids), messageRowMapper);
log.debug("messagesFromDb {}", messagesFromDb);
for(Message message: messagesFromDb){
for(InvoicingMessage message: messagesFromDb){
putToCache(message);
}
messages.addAll(messagesFromDb);
return messages;
} catch (NestedRuntimeException e) {
throw new DaoException("MessageDaoImpl.getByIds error", e);
throw new DaoException("InvoicingMessageDaoImpl.getByIds error", e);
}
}
private void putToCache(Message message){
if(message != null) {
cacheManager.getCache(CacheConfiguration.MESSAGES_BY_IDS).put(message.getId(), message);
cacheManager.getCache(CacheConfiguration.MESSAGES_BY_INVOICE).put(message.getInvoice().getId() + message.getType(), message);
private void putToCache(InvoicingMessage message){
if (message != null) {
cacheMng.putMessage(message);
cacheMng.putMessage(message.getInvoice().getId() + message.getType(), message);
}
}
private Message getFromCache(String invoiceId, String type) {
Cache cache = cacheManager.getCache(CacheConfiguration.MESSAGES_BY_INVOICE);
return cache.get(invoiceId + type, Message.class);
}
private List<Message> getFromCache(Collection<Long> ids) {
Cache cache = cacheManager.getCache(CacheConfiguration.MESSAGES_BY_IDS);
List<Message> messages = new ArrayList<>();
for (Long id : ids) {
Message e = cache.get(id, Message.class);
if (e != null) {
messages.add(e);
}
}
return messages;
}
}

View File

@ -0,0 +1,120 @@
package com.rbkmoney.hooker.dao.impl;
import com.rbkmoney.hooker.dao.CacheMng;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.QueueDao;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.InvoicingQueue;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.retry.RetryPolicyType;
import com.rbkmoney.swag_webhook_events.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.NestedRuntimeException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcDaoSupport;
import javax.sql.DataSource;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Created by inalarsanukaev on 14.11.17.
*/
public class InvoicingQueueDao extends NamedParameterJdbcDaoSupport implements QueueDao<InvoicingQueue> {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
CacheMng cacheMng;
public InvoicingQueueDao(DataSource dataSource) {
setDataSource(dataSource);
}
public static RowMapper<InvoicingQueue> queueWithPolicyRowMapper = (rs, i) -> {
InvoicingQueue queue = new InvoicingQueue();
queue.setId(rs.getLong("id"));
queue.setInvoiceId(rs.getString("invoice_id"));
Hook hook = new Hook();
hook.setId(rs.getLong("hook_id"));
hook.setPartyId(rs.getString("party_id"));
hook.setTopic(rs.getString("message_type"));
hook.setUrl(rs.getString("url"));
hook.setPubKey(rs.getString("pub_key"));
hook.setPrivKey(rs.getString("priv_key"));
hook.setEnabled(rs.getBoolean("enabled"));
RetryPolicyType retryPolicyType = RetryPolicyType.valueOf(rs.getString("retry_policy"));
hook.setRetryPolicyType(retryPolicyType);
queue.setHook(hook);
queue.setRetryPolicyRecord(retryPolicyType.build(rs));
return queue;
};
@Override
public void createWithPolicy(long messageId) throws DaoException {
final String sql = "with queue as ( " +
" insert into hook.invoicing_queue(hook_id, invoice_id)" +
" select w.id , m.invoice_id" +
" from hook.message m" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled and w.topic=CAST(:message_type as hook.message_topic)" +
" where m.id = :id " +
" on conflict(hook_id, invoice_id) do nothing returning *) " +
"insert into hook.simple_retry_policy(queue_id, message_type) select id, CAST(:message_type as hook.message_topic) from queue";
try {
int count = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("id", messageId)
.addValue("message_type", getMessagesTopic()));
log.info("Created {} queues for messageId {}", count, messageId);
} catch (NestedRuntimeException e) {
log.error("Fail to createWithPolicy queue {}", messageId, e);
throw new DaoException(e);
}
}
@Override
public List<InvoicingQueue> getWithPolicies(Collection<Long> ids) {
List<InvoicingQueue> queues = cacheMng.getQueues(ids, InvoicingQueue.class);
if (queues.size() == ids.size()) {
return queues;
}
Set<Long> queueIds = new HashSet<>(ids);
queues.forEach(h -> queueIds.remove(h.getId()));
final String sql =
" select q.id, q.hook_id, q.invoice_id, wh.party_id, wh.url, k.pub_key, k.priv_key, wh.enabled, wh.retry_policy, srp.fail_count, srp.last_fail_time, srp.message_type " +
" from hook.invoicing_queue q " +
" join hook.webhook wh on wh.id = q.hook_id and wh.enabled and wh.topic=CAST(:message_type as hook.message_topic)" +
" join hook.party_key k on k.party_id = wh.party_id " +
" left join hook.simple_retry_policy srp on q.id = srp.queue_id and srp.message_type=CAST(:message_type as hook.message_topic)" +
" where q.id in (:ids) and q.enabled";
final MapSqlParameterSource params = new MapSqlParameterSource("ids", queueIds)
.addValue("message_type", getMessagesTopic());
try {
List<InvoicingQueue> queuesFromDb = getNamedParameterJdbcTemplate().query(sql, params, queueWithPolicyRowMapper);
cacheMng.putQueues(queuesFromDb);
queues.addAll(queuesFromDb);
return queues;
} catch (NestedRuntimeException e) {
throw new DaoException(e);
}
}
@Override
public void disable(long id) throws DaoException {
final String sql = " UPDATE hook.invoicing_queue SET enabled = FALSE where id=:id;";
try {
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("id", id));
} catch (NestedRuntimeException e) {
log.error("Fail to disable queue: {}", id, e);
throw new DaoException(e);
}
}
public String getMessagesTopic() {
return Event.TopicEnum.INVOICESTOPIC.getValue();
}
}

View File

@ -26,25 +26,28 @@ public class InvoicingTaskDao extends AbstractTaskDao {
return Event.TopicEnum.INVOICESTOPIC.getValue();
}
//TODO limit invoices from hook
@Override
public void create(long messageId) {
public void create(long messageId) throws DaoException {
final String sql =
" insert into hook.scheduled_task(message_id, hook_id, message_type)" +
" select m.id, w.id, '" + getMessageTopic() + "'" +
" insert into hook.scheduled_task(message_id, queue_id, message_type)" +
" select m.id, q.id, w.topic" +
" from hook.message m" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled and w.topic=CAST(:message_type as hook.message_topic)" +
" join hook.webhook_to_events wte on wte.hook_id = w.id" +
" where m.id = :id " +
" join hook.invoicing_queue q on q.hook_id=w.id and q.enabled and q.invoice_id=m.invoice_id" +
" where m.id = :message_id " +
" and m.event_type = wte.event_type " +
" and (m.shop_id = wte.invoice_shop_id or wte.invoice_shop_id is null) " +
" and (m.invoice_status = wte.invoice_status or wte.invoice_status is null) " +
" and (m.payment_status = wte.invoice_payment_status or wte.invoice_payment_status is null)" +
" ON CONFLICT (message_id, hook_id) DO NOTHING";
" ON CONFLICT (message_id, queue_id, message_type) DO NOTHING";
try {
int updateCount = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("id", messageId));
log.debug("Created tasks count : " + updateCount);
int updateCount = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("message_id", messageId)
.addValue("message_type", getMessageTopic()));
log.info("Created tasks count={} for messageId={}", updateCount, messageId);
} catch (NestedRuntimeException e) {
log.error("Fail to create tasks for messages messages.", e);
log.error("Fail to create tasks for messages.", e);
throw new DaoException(e);
}
}

View File

@ -23,17 +23,18 @@ public class SimpleRetryPolicyDaoImpl extends NamedParameterJdbcDaoSupport imple
}
@Override
public void update(SimpleRetryPolicyRecord record) {
public void update(SimpleRetryPolicyRecord record) throws DaoException {
final String sql = "update hook.simple_retry_policy " +
" set last_fail_time = :last_fail_time, fail_count = :fail_count" +
" where hook_id = :hook_id";
" where queue_id = :queue_id and message_type=CAST(:message_type as hook.message_topic)";
try {
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("hook_id", record.getHookId())
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("queue_id", record.getQueueId())
.addValue("message_type", record.getMessageType())
.addValue("last_fail_time", record.getLastFailTime())
.addValue("fail_count", record.getFailCount()));
log.info("Record in table hook_id = "+record.getHookId()+" 'simple_retry_policy' updated.");
log.info("Record in table 'simple_retry_policy' with id {} updated.", record.getQueueId());
} catch (NestedRuntimeException e) {
log.error("Fail to update simple_retry_policy for hook: " + record.getHookId(), e);
log.warn("Fail to update simple_retry_policy for record {} ", record.getQueueId(), e);
throw new DaoException(e);
}
}

View File

@ -11,10 +11,11 @@ import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.model.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
@ -22,7 +23,7 @@ import java.util.ArrayList;
public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler {
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
private Filter filter;
@ -33,10 +34,11 @@ public class InvoiceCreatedHandler extends AbstractInvoiceEventHandler {
}
@Override
@Transactional
protected void saveEvent(InvoiceChange ic, Event event) throws DaoException {
Invoice invoiceOrigin = ic.getInvoiceCreated().getInvoice();
//////
Message message = new Message();
InvoicingMessage message = new InvoicingMessage();
message.setEventId(event.getId());
message.setEventTime(event.getCreatedAt());
message.setType(INVOICE);

View File

@ -9,7 +9,7 @@ import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.Payment;
import org.springframework.stereotype.Component;
@ -41,7 +41,7 @@ public class InvoicePaymentRefundStartedHandler extends NeedReadInvoiceEventHand
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, Message message) {
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
InvoicePaymentRefundCreated refundCreated = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentRefundChange().getPayload().getInvoicePaymentRefundCreated();
Payment payment = message.getPayment();
payment.setCreatedAt(refundCreated.getRefund().getCreatedAt());

View File

@ -10,7 +10,7 @@ import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.Payment;
import com.rbkmoney.hooker.model.PaymentContactInfo;
import com.rbkmoney.hooker.utils.PaymentToolUtils;
@ -43,7 +43,7 @@ public class InvoicePaymentStartedHandler extends NeedReadInvoiceEventHandler {
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, Message message) {
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
InvoicePayment paymentOrigin = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentStarted().getPayment();
Payment payment = new Payment();
message.setPayment(payment);
@ -94,7 +94,7 @@ public class InvoicePaymentStartedHandler extends NeedReadInvoiceEventHandler {
}
@Override
protected Message getMessage(String invoiceId) {
protected InvoicingMessage getMessage(String invoiceId) {
return messageDao.getAny(invoiceId, INVOICE);
}
}

View File

@ -10,7 +10,7 @@ import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.Payment;
import com.rbkmoney.hooker.model.PaymentStatusError;
import org.springframework.stereotype.Component;
@ -41,7 +41,7 @@ public class InvoicePaymentStatusChangedHandler extends NeedReadInvoiceEventHand
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, Message message) {
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
InvoicePaymentStatus paymentOriginStatus = ic.getInvoicePaymentChange().getPayload().getInvoicePaymentStatusChanged().getStatus();
Payment payment = message.getPayment();
payment.setStatus(paymentOriginStatus.getSetField().getFieldName());

View File

@ -7,9 +7,9 @@ import com.rbkmoney.geck.filter.Filter;
import com.rbkmoney.geck.filter.PathConditionFilter;
import com.rbkmoney.geck.filter.condition.IsNullCondition;
import com.rbkmoney.geck.filter.rule.PathConditionRule;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -20,7 +20,7 @@ public class InvoiceStatusChangedHandler extends NeedReadInvoiceEventHandler {
private Filter filter;
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
public InvoiceStatusChangedHandler() {
filter = new PathConditionFilter(new PathConditionRule(eventType.getThriftFilterPathCoditionRule(), new IsNullCondition().not()));
@ -42,7 +42,7 @@ public class InvoiceStatusChangedHandler extends NeedReadInvoiceEventHandler {
}
@Override
protected void modifyMessage(InvoiceChange ic, Event event, Message message) {
protected void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message) {
InvoiceStatus statusOrigin = ic.getInvoiceStatusChanged().getStatus();
message.getInvoice().setStatus(statusOrigin.getSetField().getFieldName());
if (statusOrigin.isSetCancelled()) {

View File

@ -3,9 +3,9 @@ package com.rbkmoney.hooker.handler.poller.impl.invoicing;
import com.rbkmoney.damsel.payment_processing.Event;
import com.rbkmoney.damsel.payment_processing.InvoiceChange;
import com.rbkmoney.hooker.dao.DaoException;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import org.springframework.beans.factory.annotation.Autowired;
/**
@ -13,15 +13,15 @@ import org.springframework.beans.factory.annotation.Autowired;
*/
public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHandler{
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
@Override
protected void saveEvent(InvoiceChange ic, Event event) throws DaoException {
final String invoiceId = event.getSource().getInvoiceId();
//getAny any saved message for related invoice
Message message = getMessage(invoiceId);
InvoicingMessage message = getMessage(invoiceId);
if (message == null) {
throw new DaoException("Message for invoice with id " + invoiceId + " not exist");
throw new DaoException("InvoicingMessage for invoice with id " + invoiceId + " not exist");
}
message.setEventType(getEventType());
message.setType(getMessageType());
@ -33,7 +33,7 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa
//TODO getAny message id and write to logs
}
protected Message getMessage(String invoiceId) {
protected InvoicingMessage getMessage(String invoiceId) {
return messageDao.getAny(invoiceId, getMessageType());
}
@ -41,5 +41,5 @@ public abstract class NeedReadInvoiceEventHandler extends AbstractInvoiceEventHa
protected abstract EventType getEventType();
protected abstract void modifyMessage(InvoiceChange ic, Event event, Message message);
protected abstract void modifyMessage(InvoiceChange ic, Event event, InvoicingMessage message);
}

View File

@ -7,8 +7,7 @@ import com.rbkmoney.swag_webhook_events.CustomerBinding;
/**
* Created by inalarsanukaev on 13.10.17.
*/
public class CustomerMessage {
private long id;
public class CustomerMessage extends Message {
private long eventId;
private String type;
private String occuredAt;
@ -21,14 +20,6 @@ public class CustomerMessage {
return AbstractCustomerEventHandler.BINDING.equals(type);
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public long getEventId() {
return eventId;
}
@ -88,7 +79,7 @@ public class CustomerMessage {
@Override
public String toString() {
return "CustomerMessage{" +
"id=" + id +
"id=" + getId() +
", eventId=" + eventId +
", type='" + type +
", occuredAt='" + occuredAt +

View File

@ -78,7 +78,7 @@ public class CustomerMessageJson {
CustomerMessageJson messageJson = message.isBinding() ? new CustomerBindingMessageJson(message.getCustomerBinding()) : new CustomerMessageJson();
messageJson.eventID = message.getEventId();
messageJson.occuredAt = message.getOccuredAt();
messageJson.topic = Event.TopicEnum.INVOICESTOPIC.getValue();
messageJson.topic = Event.TopicEnum.CUSTOMERSTOPIC.getValue();
messageJson.customer = message.getCustomer();
messageJson.eventType = eventTypeMapping.get(message.getEventType()).getValue();
return new ObjectMapper()

View File

@ -0,0 +1,16 @@
package com.rbkmoney.hooker.model;
/**
* Created by inalarsanukaev on 14.11.17.
*/
public class CustomerQueue extends Queue {
private String customerId;
public String getCustomerId() {
return customerId;
}
public void setCustomerId(String customerId) {
this.customerId = customerId;
}
}

View File

@ -1,7 +1,6 @@
package com.rbkmoney.hooker.model;
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import com.rbkmoney.hooker.retry.RetryPolicyType;
import java.util.Set;
@ -13,24 +12,24 @@ import java.util.Set;
public class Hook {
private long id;
private String partyId;
private String topic;
private Set<WebhookAdditionalFilter> filters;
private String url;
private String pubKey;
private String privKey;
private boolean enabled;
private RetryPolicyType retryPolicyType;
private RetryPolicyRecord retryPolicyRecord;
public Hook(long id, String partyId, Set<WebhookAdditionalFilter> filters, String url, String pubKey, String privKey, boolean enabled, RetryPolicyType retryPolicyType, RetryPolicyRecord retryPolicyRecord) {
public Hook(long id, String partyId, String topic, Set<WebhookAdditionalFilter> filters, String url, String pubKey, String privKey, boolean enabled, RetryPolicyType retryPolicyType) {
this.id = id;
this.partyId = partyId;
this.topic = topic;
this.filters = filters;
this.url = url;
this.pubKey = pubKey;
this.privKey = privKey;
this.enabled = enabled;
this.retryPolicyType = retryPolicyType;
this.retryPolicyRecord = retryPolicyRecord;
}
public Hook() {
@ -52,6 +51,14 @@ public class Hook {
this.partyId = partyId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Set<WebhookAdditionalFilter> getFilters() {
return filters;
}
@ -100,18 +107,11 @@ public class Hook {
this.retryPolicyType = retryPolicyType;
}
public RetryPolicyRecord getRetryPolicyRecord() {
return retryPolicyRecord;
}
public void setRetryPolicyRecord(RetryPolicyRecord retryPolicyRecord) {
this.retryPolicyRecord = retryPolicyRecord;
}
@Override
public String toString() {
return "Hook{" +
"id=" + id +
", topic=" + topic +
", partyId='" + partyId + '\'' +
", url='" + url + '\'' +
'}';

View File

@ -0,0 +1,131 @@
package com.rbkmoney.hooker.model;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
/**
* Created by inalarsanukaev on 07.04.17.
*/
public class InvoicingMessage extends Message {
private long eventId;
private String eventTime;
private String type;
private String partyId;
private EventType eventType;
private Invoice invoice;
private Payment payment;
public InvoicingMessage(InvoicingMessage other) {
setId(other.getId());
this.eventId = other.eventId;
this.eventTime = other.eventTime;
this.type = other.type;
this.partyId = other.partyId;
this.eventType = other.eventType;
if (other.invoice != null) {
this.invoice = new Invoice(other.invoice);
}
if (other.payment != null) {
this.payment = new Payment(other.payment);
}
}
public InvoicingMessage() {
}
public long getEventId() {
return eventId;
}
public void setEventId(long eventId) {
this.eventId = eventId;
}
public String getEventTime() {
return eventTime;
}
public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getPartyId() {
return partyId;
}
public void setPartyId(String partyId) {
this.partyId = partyId;
}
public EventType getEventType() {
return eventType;
}
public void setEventType(EventType eventType) {
this.eventType = eventType;
}
public Invoice getInvoice() {
return invoice;
}
public void setInvoice(Invoice invoice) {
this.invoice = invoice;
}
public Payment getPayment() {
return payment;
}
public void setPayment(Payment payment) {
this.payment = payment;
}
public boolean isInvoice() {
return AbstractInvoiceEventHandler.INVOICE.equals(getType());
}
public boolean isPayment() {
return AbstractInvoiceEventHandler.PAYMENT.equals(getType());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InvoicingMessage message = (InvoicingMessage) o;
return getId() == message.getId();
}
@Override
public String toString() {
return "InvoicingMessage{" +
"id=" + getId() +
", eventId=" + eventId +
", eventTime='" + eventTime + '\'' +
", invoiceId=" + invoice.getId() +
", invoiceStatus=" + invoice.getStatus() +
(isPayment() ? ", paymentId=" + payment.getId() : "") +
(isPayment() ? ", paymentStatus=" + payment.getStatus() : "") +
'}';
}
@Override
public int hashCode() {
return (int) (getId() ^ (getId() >>> 32));
}
public InvoicingMessage copy(){
return new InvoicingMessage(this);
}
}

View File

@ -15,7 +15,7 @@ import java.util.Map;
* Created by inalarsanukaev on 07.04.17.
*/
@JsonPropertyOrder({"eventID", "occuredAt", "topic", "eventType", "invoice"})
public class MessageJson {
public class InvoicingMessageJson {
private static Map<String, String> invoiceStatusesMapping = new HashMap<>();
static {
invoiceStatusesMapping.put("unpaid", "InvoiceCreated");
@ -40,7 +40,7 @@ public class MessageJson {
private String eventType;
private Invoice invoice;
public MessageJson() {
public InvoicingMessageJson() {
}
public long getEventID() {
@ -83,25 +83,25 @@ public class MessageJson {
this.invoice = invoice;
}
public static String buildMessageJson(Message message) throws JsonProcessingException {
public static String buildMessageJson(InvoicingMessage message) throws JsonProcessingException {
boolean isInvoice = AbstractInvoiceEventHandler.INVOICE.equals(message.getType());
MessageJson messageJson = isInvoice ? new InvoiceMessageJson() : new PaymentMessageJson(message.getPayment());
messageJson.eventID = message.getEventId();
messageJson.occuredAt = message.getEventTime();
messageJson.topic = Event.TopicEnum.INVOICESTOPIC.getValue();
messageJson.invoice = message.getInvoice();
InvoicingMessageJson invoicingMessageJson = isInvoice ? new InvoiceMessageJson() : new PaymentMessageJson(message.getPayment());
invoicingMessageJson.eventID = message.getEventId();
invoicingMessageJson.occuredAt = message.getEventTime();
invoicingMessageJson.topic = Event.TopicEnum.INVOICESTOPIC.getValue();
invoicingMessageJson.invoice = message.getInvoice();
messageJson.eventType = isInvoice ? invoiceStatusesMapping.get(message.getInvoice().getStatus()) : paymentStatusesMapping.get(message.getPayment().getStatus()) ;
invoicingMessageJson.eventType = isInvoice ? invoiceStatusesMapping.get(message.getInvoice().getStatus()) : paymentStatusesMapping.get(message.getPayment().getStatus()) ;
return new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true)
.writeValueAsString(messageJson);
.writeValueAsString(invoicingMessageJson);
}
static class InvoiceMessageJson extends MessageJson{
static class InvoiceMessageJson extends InvoicingMessageJson {
}
static class PaymentMessageJson extends MessageJson {
static class PaymentMessageJson extends InvoicingMessageJson {
Payment payment;
public PaymentMessageJson(Payment payment) {

View File

@ -0,0 +1,17 @@
package com.rbkmoney.hooker.model;
/**
* Created by inalarsanukaev on 14.11.17.
*/
public class InvoicingQueue extends Queue {
private String invoiceId;
public String getInvoiceId() {
return invoiceId;
}
public void setInvoiceId(String invoiceId) {
this.invoiceId = invoiceId;
}
}

View File

@ -1,49 +1,10 @@
package com.rbkmoney.hooker.model;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
/**
* Created by inalarsanukaev on 07.04.17.
* Created by inalarsanukaev on 20.11.17.
*/
public class Message {
private long id;
private long eventId;
private String eventTime;
private String type;
private String partyId;
private EventType eventType;
private Invoice invoice;
private Payment payment;
public Message(Message other) {
this.id = other.id;
this.eventId = other.eventId;
this.eventTime = other.eventTime;
this.type = other.type;
this.partyId = other.partyId;
this.eventType = other.eventType;
if (other.invoice != null) {
this.invoice = new Invoice(other.invoice);
}
if (other.payment != null) {
this.payment = new Payment(other.payment);
}
}
public Message(long id, long eventId, String eventTime, String type, String partyId, EventType eventType, Invoice invoice, Payment payment) {
this.id = id;
this.eventId = eventId;
this.eventTime = eventTime;
this.type = type;
this.partyId = partyId;
this.eventType = eventType;
this.invoice = invoice;
this.payment = payment;
}
public Message() {
}
public long getId() {
return id;
@ -52,100 +13,4 @@ public class Message {
public void setId(long id) {
this.id = id;
}
public long getEventId() {
return eventId;
}
public void setEventId(long eventId) {
this.eventId = eventId;
}
public String getEventTime() {
return eventTime;
}
public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getPartyId() {
return partyId;
}
public void setPartyId(String partyId) {
this.partyId = partyId;
}
public EventType getEventType() {
return eventType;
}
public void setEventType(EventType eventType) {
this.eventType = eventType;
}
public Invoice getInvoice() {
return invoice;
}
public void setInvoice(Invoice invoice) {
this.invoice = invoice;
}
public Payment getPayment() {
return payment;
}
public void setPayment(Payment payment) {
this.payment = payment;
}
public boolean isInvoice() {
return AbstractInvoiceEventHandler.INVOICE.equals(getType());
}
public boolean isPayment() {
return AbstractInvoiceEventHandler.PAYMENT.equals(getType());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message = (Message) o;
return id == message.id;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", eventId=" + eventId +
", eventTime='" + eventTime + '\'' +
", invoiceId=" + invoice.getId() +
", invoiceStatus=" + invoice.getStatus() +
(isPayment() ? ", paymentId=" + payment.getId() : "") +
(isPayment() ? ", paymentStatus=" + payment.getStatus() : "") +
'}';
}
@Override
public int hashCode() {
return (int) (id ^ (id >>> 32));
}
public Message copy(){
return new Message(this);
}
}

View File

@ -0,0 +1,45 @@
package com.rbkmoney.hooker.model;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
/**
* Created by inalarsanukaev on 14.11.17.
*/
public class Queue {
private long id;
private Hook hook;
private RetryPolicyRecord retryPolicyRecord;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public Hook getHook() {
return hook;
}
public void setHook(Hook hook) {
this.hook = hook;
}
public RetryPolicyRecord getRetryPolicyRecord() {
return retryPolicyRecord;
}
public void setRetryPolicyRecord(RetryPolicyRecord retryPolicyRecord) {
this.retryPolicyRecord = retryPolicyRecord;
}
@Override
public String toString() {
return "Queue{" +
"id=" + id +
", hook=" + hook +
", retryPolicyRecord=" + retryPolicyRecord +
'}';
}
}

View File

@ -4,20 +4,21 @@ package com.rbkmoney.hooker.model;
* Created by jeckep on 17.04.17.
*/
public class Task {
long hookId;
long messageId;
long queueId;
public Task(long hookId, long messageId) {
this.hookId = hookId;
public Task(long messageId, long queueId) {
this.messageId = messageId;
this.queueId = queueId;
}
public long getHookId() {
return hookId;
public long getQueueId() {
return queueId;
}
public void setHookId(long hookId) {
this.hookId = hookId;
public void setQueueId(long queueId) {
this.queueId = queueId;
}
public long getMessageId() {
@ -27,4 +28,12 @@ public class Task {
public void setMessageId(long messageId) {
this.messageId = messageId;
}
@Override
public String toString() {
return "Task{" +
"messageId=" + messageId +
", queueId=" + queueId +
'}';
}
}

View File

@ -1,7 +1,7 @@
package com.rbkmoney.hooker.retry;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.retry.impl.simple.SimpleRetryPolicy;
import com.rbkmoney.hooker.retry.impl.simple.SimpleRetryPolicyRecord;
import org.springframework.beans.factory.annotation.Autowired;
@ -24,24 +24,24 @@ public class RetryPoliciesService {
@Autowired
SimpleRetryPolicyDao simpleRetryPolicyDao;
public RetryPolicy getRetryPolicyByType(RetryPolicyType type){
if(RetryPolicyType.SIMPLE.equals(type)){
public RetryPolicy getRetryPolicyByType(RetryPolicyType type) {
if (RetryPolicyType.SIMPLE.equals(type)) {
return simpleRetryPolicy;
} else {
throw new UnsupportedOperationException("Retry policy for type: " + type.toString() + " not found");
}
}
public List<Hook> filter(Collection<Hook> hooks){
return hooks.stream().
filter(h -> getRetryPolicyByType(h.getRetryPolicyType()).isActive(h.getRetryPolicyRecord()))
public List<Queue> filter(Collection<? extends Queue> queues) {
return queues.stream().
filter(q -> getRetryPolicyByType(q.getHook().getRetryPolicyType()).isActive(q.getRetryPolicyRecord()))
.collect(Collectors.toList());
}
public void update(RetryPolicyRecord record) {
if(RetryPolicyType.SIMPLE.equals(record.getType())){
simpleRetryPolicyDao.update((SimpleRetryPolicyRecord)record);
}else {
if (RetryPolicyType.SIMPLE.equals(record.getType())) {
simpleRetryPolicyDao.update((SimpleRetryPolicyRecord) record);
} else {
throw new UnsupportedOperationException("Retry policy DAO for type: " + record.getType().toString() + " not found");
}
}

View File

@ -4,7 +4,7 @@ package com.rbkmoney.hooker.retry;
* Created by jeckep on 17.04.17.
*/
public interface RetryPolicy<T> {
void onFail(T record);
boolean isFail(T record);
//returns false if we should wait timeout to send message to this hook
boolean isActive(T record);
RetryPolicyType getType();

View File

@ -21,7 +21,8 @@ public enum RetryPolicyType {
@Override
public RetryPolicyRecord build(ResultSet rs) throws SQLException{
SimpleRetryPolicyRecord record = new SimpleRetryPolicyRecord();
record.setHookId(rs.getLong("hook_id"));
record.setQueueId(rs.getLong("id"));
record.setMessageType(rs.getString("message_type"));
record.setFailCount(rs.getInt("fail_count"));
record.setLastFailTime(rs.getLong("last_fail_time"));
return record;

View File

@ -1,34 +1,21 @@
package com.rbkmoney.hooker.retry.impl.simple;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.retry.RetryPolicy;
import com.rbkmoney.hooker.retry.RetryPolicyType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Created by jeckep on 17.04.17.
*/
@Component
public class SimpleRetryPolicy implements RetryPolicy<SimpleRetryPolicyRecord> {
private static Logger log = LoggerFactory.getLogger(SimpleRetryPolicy.class);
@Autowired
SimpleRetryPolicyDao simpleRetryPolicyDao;
@Autowired
HookDao hookDao;
@Autowired
List<TaskDao> taskDaoList;
private long[] delays = {30, 300, 900, 3600,
3600, 3600, 3600, 3600, 3600, 3600, 3600, 3600, 3600, 3600,
3600, 3600, 3600, 3600, 3600, 3600, 3600, 3600, 3600, 3600,
@ -41,16 +28,12 @@ public class SimpleRetryPolicy implements RetryPolicy<SimpleRetryPolicyRecord> {
}
@Override
public void onFail(SimpleRetryPolicyRecord rp) {
public boolean isFail(SimpleRetryPolicyRecord rp) {
rp.setFailCount(rp.getFailCount() + 1);
rp.setLastFailTime(System.currentTimeMillis());
simpleRetryPolicyDao.update(rp);
if (rp.getFailCount() > delays.length) {
hookDao.disable(rp.getHookId());
taskDaoList.forEach(t -> t.removeAll(rp.getHookId()));
log.warn("Hook: " + rp.getHookId() + " was disabled according to retry policy.");
}
return rp.getFailCount() > delays.length;
}
@Override

View File

@ -9,12 +9,14 @@ import com.rbkmoney.hooker.retry.RetryPolicyType;
public class SimpleRetryPolicyRecord extends RetryPolicyRecord {
public static RetryPolicyType type = RetryPolicyType.SIMPLE;
long hookId;
long queueId;
String messageType;
int failCount;
long lastFailTime;
public SimpleRetryPolicyRecord(long hookId, int failCount, long lastFailTime) {
this.hookId = hookId;
public SimpleRetryPolicyRecord(long queueId, String messageType, int failCount, long lastFailTime) {
this.queueId = queueId;
this.messageType = messageType;
this.failCount = failCount;
this.lastFailTime = lastFailTime;
}
@ -26,12 +28,20 @@ public class SimpleRetryPolicyRecord extends RetryPolicyRecord {
SimpleRetryPolicyRecord.type = type;
}
public long getHookId() {
return hookId;
public long getQueueId() {
return queueId;
}
public void setHookId(long hookId) {
this.hookId = hookId;
public void setQueueId(long queueId) {
this.queueId = queueId;
}
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
public int getFailCount() {

View File

@ -0,0 +1,169 @@
package com.rbkmoney.hooker.scheduler;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.QueueDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.model.Task;
import com.rbkmoney.hooker.retry.RetryPoliciesService;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* Created by jeckep on 17.04.17.
*/
public abstract class MessageScheduler<M extends Message, Q extends Queue> {
Logger log = LoggerFactory.getLogger(this.getClass());
private TaskDao taskDao;
private QueueDao<Q> queueDao;
private MessageDao<M> messageDao;
@Autowired
private RetryPoliciesService retryPoliciesService;
@Autowired
private Signer signer;
@Autowired
private PostSender postSender;
private final Set<Long> processedQueues = Collections.synchronizedSet(new HashSet<>());
private ExecutorService executorService;
public MessageScheduler(TaskDao taskDao, QueueDao<Q> queueDao, MessageDao<M> messageDao, int numberOfWorkers) {
this.taskDao = taskDao;
this.queueDao = queueDao;
this.messageDao = messageDao;
this.executorService = Executors.newFixedThreadPool(numberOfWorkers);
}
@Scheduled(fixedRateString = "${message.scheduler.delay}")
public void loop() throws InterruptedException {
final List<Long> currentlyProcessedQueues = new ArrayList<>(processedQueues);
final Map<Long, List<Task>> scheduledTasks = getScheduledTasks(currentlyProcessedQueues);
if (scheduledTasks.entrySet().isEmpty()) {
return;
}
final Map<Long, Queue> healthyQueues = loadQueues(scheduledTasks.keySet())
.stream().collect(Collectors.toMap(Queue::getId, v -> v));
processedQueues.addAll(healthyQueues.keySet());
final Set<Long> messageIdsToSend = getMessageIdsFilteredByQueues(scheduledTasks, healthyQueues.keySet());
final Map<Long, M> messagesMap = loadMessages(messageIdsToSend);
log.info("Schedulled tasks count = {}, after filter = {}", scheduledTasks.size(), messageIdsToSend.size());
List<MessageSender<?>> messageSenderList = new ArrayList<>(healthyQueues.keySet().size());
for (long queueId : healthyQueues.keySet()) {
List<Task> tasks = scheduledTasks.get(queueId);
List<M> messagesForQueue = new ArrayList<>();
for (Task task : tasks) {
M e = messagesMap.get(task.getMessageId());
if (e != null) {
messagesForQueue.add(e);
} else {
log.error("InvoicingMessage with id {} couldn't be null", task.getMessageId());
}
}
MessageSender messageSender = getMessageSender(new MessageSender.QueueStatus(healthyQueues.get(queueId)), messagesForQueue, taskDao, signer, postSender);
messageSenderList.add(messageSender);
}
List<Future<MessageSender.QueueStatus>> futureList = executorService.invokeAll(messageSenderList);
for (Future<MessageSender.QueueStatus> status : futureList) {
if (!status.isCancelled()) {
try {
if (status.get().isSuccess()) {
done(status.get().getQueue());
} else {
fail(status.get().getQueue());
}
} catch (ExecutionException e) {
log.error("Unexpected error when get queue");
}
}
}
}
protected abstract MessageSender getMessageSender(MessageSender.QueueStatus queueStatus, List<M> messagesForQueue, TaskDao taskDao, Signer signer, PostSender postSender);
//worker should invoke this method when it is done with scheduled messages for hookId
private void done(Queue queue) {
processedQueues.remove(queue.getId());
//reset fail count for hook
if (queue.getRetryPolicyRecord().isFailed()) {
RetryPolicyRecord record = queue.getRetryPolicyRecord();
record.reset();
retryPoliciesService.update(record);
}
}
//worker should invoke this method when it is fail to send message to hookId
private void fail(Queue queue) {
processedQueues.remove(queue.getId());
log.warn("Queue {} failed.", queue.getId());
if (retryPoliciesService.getRetryPolicyByType(queue.getHook().getRetryPolicyType())
.isFail(queue.getRetryPolicyRecord())) {
queueDao.disable(queue.getId());
taskDao.removeAll(queue.getId());
log.warn("Queue {} was disabled according to retry policy.", queue.getId());
}
}
private Map<Long, List<Task>> getScheduledTasks(Collection<Long> excludeQueueIds) {
return taskDao.getScheduled(excludeQueueIds);
}
private List<Queue> loadQueues(Collection<Long> queueIds) {
List<? extends Queue> queuesWaitingMessages = queueDao.getWithPolicies(queueIds);
return retryPoliciesService.filter(queuesWaitingMessages);
}
private Set<Long> getMessageIdsFilteredByQueues(Map<Long, List<Task>> scheduledTasks, Collection<Long> queueIds) {
final Set<Long> messageIds = new HashSet<>();
for (long queueId : queueIds) {
for (Task t : scheduledTasks.get(queueId)) {
messageIds.add(t.getMessageId());
}
}
return messageIds;
}
private Map<Long, M> loadMessages(Collection<Long> messageIds) {
List<M> messages = messageDao.getBy(messageIds);
Map<Long, M> map = new HashMap<>();
for(M message: messages){
map.put(message.getId(), message);
}
return map;
}
@PreDestroy
public void preDestroy(){
executorService.shutdownNow();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("Failed to stop scheduller in time.");
} else {
log.info("Poller stopped.");
}
} catch (InterruptedException e) {
log.warn("Waiting for scheduller shutdown is interrupted.");
}
}
}

View File

@ -0,0 +1,87 @@
package com.rbkmoney.hooker.scheduler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import com.rbkmoney.hooker.service.err.PostRequestException;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Created by jeckep on 18.04.17.
*/
public abstract class MessageSender<M extends Message> implements Callable<MessageSender.QueueStatus> {
public static Logger log = LoggerFactory.getLogger(MessageSender.class);
private MessageSender.QueueStatus queueStatus;
private List<M> messages;
private TaskDao taskDao;
private Signer signer;
private PostSender postSender;
public MessageSender(MessageSender.QueueStatus queueStatus, List<M> messages, TaskDao taskDao, Signer signer, PostSender postSender) {
this.queueStatus = queueStatus;
this.messages = messages;
this.taskDao = taskDao;
this.signer = signer;
this.postSender = postSender;
}
@Override
public MessageSender.QueueStatus call() throws Exception {
try {
for (M message : messages) {
final String messageJson = getMessageJson(message);
final String signature = signer.sign(messageJson, queueStatus.getQueue().getHook().getPrivKey());
int statusCode = postSender.doPost(queueStatus.getQueue().getHook().getUrl(), messageJson, signature);
if (statusCode != HttpStatus.SC_OK) {
log.warn("Wrong status code {} from merchant, we'll try to resend it. Message {}", statusCode, message);
throw new PostRequestException("Internal server error for message id = " + message.getId());
}
log.info("{} is sent to {}", message, queueStatus.getQueue().getHook());
taskDao.remove(queueStatus.getQueue().getId(), message.getId()); //required after message is sent
}
queueStatus.setSuccess(true);
} catch (Exception e) {
log.warn("Couldn't send message to hook {}. We'll try to resend it", queueStatus.getQueue().getHook(), e);
queueStatus.setSuccess(false);
}
return queueStatus;
}
protected abstract String getMessageJson(M message) throws JsonProcessingException;
public static class QueueStatus {
private Queue queue;
private boolean isSuccess;
public QueueStatus(Queue queue) {
this.queue = queue;
}
public Queue getQueue() {
return queue;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
}
}

View File

@ -1,151 +1,39 @@
package com.rbkmoney.hooker.scheduler.customer;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.dao.impl.CustomerQueueDao;
import com.rbkmoney.hooker.dao.impl.CustomerTaskDao;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.Task;
import com.rbkmoney.hooker.retry.RetryPoliciesService;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import com.rbkmoney.hooker.model.CustomerQueue;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.List;
/**
* Created by jeckep on 17.04.17.
*/
@Service
public class CustomerMessageScheduler {
Logger log = LoggerFactory.getLogger(this.getClass());
public class CustomerMessageScheduler extends MessageScheduler<CustomerMessage, CustomerQueue> {
@Autowired
private CustomerTaskDao taskDao;
@Autowired
private HookDao hookDao;
@Autowired
private CustomerDao customerDao;
@Autowired
private RetryPoliciesService retryPoliciesService;
@Autowired
Signer signer;
@Autowired
PostSender postSender;
private final Set<Long> processedHooks = Collections.synchronizedSet(new HashSet<>());
private ExecutorService executorService;
public CustomerMessageScheduler(@Value("${message.sender.number}") int numberOfWorkers) {
this.executorService = Executors.newFixedThreadPool(numberOfWorkers);
public CustomerMessageScheduler(
@Autowired CustomerTaskDao taskDao,
@Autowired CustomerQueueDao queueDao,
@Autowired CustomerDao customerDao,
@Value("${message.sender.number}") int numberOfWorkers) {
super(taskDao, queueDao, customerDao, numberOfWorkers);
}
@Scheduled(fixedRateString = "${message.scheduler.delay}")
public void loop() throws InterruptedException {
final List<Long> currentlyProcessedHooks;
synchronized (processedHooks) {
currentlyProcessedHooks = new ArrayList<>(processedHooks);
}
final Map<Long, List<Task>> scheduledTasks = getScheduledTasks(currentlyProcessedHooks);
final Map<Long, Hook> healthyHooks = loadHooks(scheduledTasks.keySet()).stream().collect(Collectors.toMap(v -> v.getId(), v -> v));
//ready task means - not delayed by failed hook
int numberOfTasks = numberOfReadyTasks(scheduledTasks, healthyHooks.keySet());
if(numberOfTasks > 0){
log.info("Number of not done ready tasks(message->hook): {}", numberOfTasks);
}
processedHooks.addAll(healthyHooks.keySet());
final Set<Long> messageIdsToSend = getMessageIdsFilteredByHooks(scheduledTasks, healthyHooks.keySet());
final Map<Long, CustomerMessage> messagesMap = loadMessages(messageIdsToSend);
for (long hookId : healthyHooks.keySet()) {
List<Task> tasks = scheduledTasks.get(hookId);
List<CustomerMessage> messagesForHook = new ArrayList<>();
for (Task task : tasks) {
CustomerMessage e = messagesMap.get(task.getMessageId());
if (e != null) {
messagesForHook.add(e);
} else {
log.error("Message with id {} couldn't be null", task.getMessageId());
}
}
CustomerMessageSender messageSender = new CustomerMessageSender(healthyHooks.get(hookId), messagesForHook, taskDao, this, signer, postSender);
executorService.submit(messageSender);
}
}
//worker should invoke this method when it is done with scheduled messages for hookId
public void done(Hook hook) {
processedHooks.remove(hook.getId());
//reset fail count for hook
if (hook.getRetryPolicyRecord().isFailed()) {
RetryPolicyRecord record = hook.getRetryPolicyRecord();
record.reset();
retryPoliciesService.update(record);
}
}
//worker should invoke this method when it is fail to send message to hookId
public void fail(Hook hook) {
processedHooks.remove(hook.getId());
log.warn("Hook: " + hook.getId() + " failed.");
retryPoliciesService.getRetryPolicyByType(hook.getRetryPolicyType())
.onFail(hook.getRetryPolicyRecord());
}
private Map<Long, List<Task>> getScheduledTasks(Collection<Long> excludeHooksIds) {
return taskDao.getScheduled(excludeHooksIds);
}
private List<Hook> loadHooks(Collection<Long> hookIds) {
List<Hook> hooksWaitingMessages = hookDao.getWithPolicies(hookIds);
return retryPoliciesService.filter(hooksWaitingMessages);
}
private Set<Long> getMessageIdsFilteredByHooks(Map<Long, List<Task>> scheduledTasks, Collection<Long> liveHookIds) {
final Set<Long> messageIds = new HashSet<>();
for (long hookId : liveHookIds) {
for (Task t : scheduledTasks.get(hookId)) {
messageIds.add(t.getMessageId());
}
}
return messageIds;
}
private int numberOfReadyTasks(Map<Long, List<Task>> tasks, Collection<Long> liveHookIds){
int count = 0;
for(long hookId: liveHookIds){
count += tasks.get(hookId).size();
}
return count;
}
private Map<Long, CustomerMessage> loadMessages(Collection<Long> messageIds) {
List<CustomerMessage> messages = customerDao.getBy(messageIds);
Map<Long, CustomerMessage> map = new HashMap<>();
for(CustomerMessage message: messages){
map.put(message.getId(), message);
}
return map;
@Override
protected MessageSender getMessageSender(MessageSender.QueueStatus queueStatus, List<CustomerMessage> messagesForQueue, TaskDao taskDao, Signer signer, PostSender postSender) {
return new CustomerMessageSender(queueStatus, messagesForQueue, taskDao, signer, postSender);
}
}

View File

@ -1,59 +1,28 @@
package com.rbkmoney.hooker.scheduler.customer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.dao.impl.CustomerTaskDao;
import com.rbkmoney.hooker.model.CustomerMessage;
import com.rbkmoney.hooker.model.CustomerMessageJson;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import com.rbkmoney.hooker.service.err.PostRequestException;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by jeckep on 18.04.17.
*/
public class CustomerMessageSender implements Runnable {
public static Logger log = LoggerFactory.getLogger(CustomerMessageSender.class);
public class CustomerMessageSender extends MessageSender<CustomerMessage> {
private Hook hook;
private List<CustomerMessage> messages;
private TaskDao taskDao;
private CustomerMessageScheduler workerTaskScheduler;
private Signer signer;
private PostSender postSender;
public CustomerMessageSender(Hook hook, List<CustomerMessage> messages, CustomerTaskDao taskDao, CustomerMessageScheduler workerTaskScheduler, Signer signer, PostSender postSender) {
this.hook = hook;
this.messages = messages;
this.taskDao = taskDao;
this.workerTaskScheduler = workerTaskScheduler;
this.signer = signer;
this.postSender = postSender;
public CustomerMessageSender(MessageSender.QueueStatus queueStatus, List<CustomerMessage> messages, TaskDao taskDao, Signer signer, PostSender postSender) {
super(queueStatus, messages, taskDao, signer, postSender);
}
@Override
public void run() {
try {
for (CustomerMessage message : messages) {
final String messageJson = CustomerMessageJson.buildMessageJson(message);
final String signature = signer.sign(messageJson, hook.getPrivKey());
int statusCode = postSender.doPost(hook.getUrl(), messageJson, signature);
if (statusCode != HttpStatus.SC_OK) {
log.warn("Wrong status code " + statusCode + " from merchant. Message id = " + message.getId());
throw new PostRequestException("Internal server error for message id = " + message.getId());
}
log.info("{} is sent to {}", message, hook);
taskDao.remove(hook.getId(), message.getId()); //required after message is sent
}
workerTaskScheduler.done(hook); // required after all messages processed
} catch (Exception e) {
log.warn("Couldn't send message to hook: " + hook.toString(), e);
workerTaskScheduler.fail(hook); // required if fail to send message
}
protected String getMessageJson(CustomerMessage message) throws JsonProcessingException {
return CustomerMessageJson.buildMessageJson(message);
}
}

View File

@ -0,0 +1,39 @@
package com.rbkmoney.hooker.scheduler.invoicing;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.dao.impl.InvoicingQueueDao;
import com.rbkmoney.hooker.dao.impl.InvoicingTaskDao;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.InvoicingQueue;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Created by jeckep on 17.04.17.
*/
@Service
public class InvoicingMessageScheduler extends MessageScheduler<InvoicingMessage, InvoicingQueue> {
public InvoicingMessageScheduler(
@Autowired InvoicingTaskDao taskDao,
@Autowired InvoicingQueueDao queueDao,
@Autowired InvoicingMessageDao customerDao,
@Value("${message.sender.number}") int numberOfWorkers) {
super(taskDao, queueDao, customerDao, numberOfWorkers);
}
@Override
protected MessageSender getMessageSender(MessageSender.QueueStatus queueStatus, List<InvoicingMessage> messagesForQueue, TaskDao taskDao, Signer signer, PostSender postSender) {
return new InvoicingMessageSender(queueStatus, messagesForQueue, taskDao, signer, postSender);
}
}

View File

@ -0,0 +1,28 @@
package com.rbkmoney.hooker.scheduler.invoicing;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.model.InvoicingMessageJson;
import com.rbkmoney.hooker.model.Queue;
import com.rbkmoney.hooker.scheduler.MessageScheduler;
import com.rbkmoney.hooker.scheduler.MessageSender;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import java.util.List;
/**
* Created by jeckep on 18.04.17.
*/
public class InvoicingMessageSender extends MessageSender<InvoicingMessage> {
public InvoicingMessageSender(MessageSender.QueueStatus queueStatus, List<InvoicingMessage> messages, TaskDao taskDao, Signer signer, PostSender postSender) {
super(queueStatus, messages, taskDao, signer, postSender);
}
@Override
protected String getMessageJson(InvoicingMessage message) throws JsonProcessingException {
return InvoicingMessageJson.buildMessageJson(message);
}
}

View File

@ -1,151 +0,0 @@
package com.rbkmoney.hooker.scheduler.invoicing;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.impl.InvoicingTaskDao;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.Task;
import com.rbkmoney.hooker.retry.RetryPoliciesService;
import com.rbkmoney.hooker.retry.RetryPolicyRecord;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* Created by jeckep on 17.04.17.
*/
@Service
public class MessageScheduler {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private InvoicingTaskDao taskDao;
@Autowired
private HookDao hookDao;
@Autowired
private MessageDao messageDao;
@Autowired
private RetryPoliciesService retryPoliciesService;
@Autowired
Signer signer;
@Autowired
PostSender postSender;
private final Set<Long> processedHooks = Collections.synchronizedSet(new HashSet<>());
private ExecutorService executorService;
public MessageScheduler(@Value("${message.sender.number}") int numberOfWorkers) {
this.executorService = Executors.newFixedThreadPool(numberOfWorkers);
}
@Scheduled(fixedRateString = "${message.scheduler.delay}")
public void loop() throws InterruptedException {
final List<Long> currentlyProcessedHooks;
synchronized (processedHooks) {
currentlyProcessedHooks = new ArrayList<>(processedHooks);
}
final Map<Long, List<Task>> scheduledTasks = getScheduledTasks(currentlyProcessedHooks);
final Map<Long, Hook> healthyHooks = loadHooks(scheduledTasks.keySet()).stream().collect(Collectors.toMap(v -> v.getId(), v -> v));
//ready task means - not delayed by failed hook
int numberOfTasks = numberOfReadyTasks(scheduledTasks, healthyHooks.keySet());
if(numberOfTasks > 0){
log.info("Number of not done ready tasks(message->hook): {}", numberOfTasks);
}
processedHooks.addAll(healthyHooks.keySet());
final Set<Long> messageIdsToSend = getMessageIdsFilteredByHooks(scheduledTasks, healthyHooks.keySet());
final Map<Long, Message> messagesMap = loadMessages(messageIdsToSend);
for (long hookId : healthyHooks.keySet()) {
List<Task> tasks = scheduledTasks.get(hookId);
List<Message> messagesForHook = new ArrayList<>();
for (Task task : tasks) {
Message e = messagesMap.get(task.getMessageId());
if (e != null) {
messagesForHook.add(e);
} else {
log.error("Message with id {} couldn't be null", task.getMessageId());
}
}
MessageSender messageSender = new MessageSender(healthyHooks.get(hookId), messagesForHook, taskDao, this, signer, postSender);
executorService.submit(messageSender);
}
}
//worker should invoke this method when it is done with scheduled messages for hookId
public void done(Hook hook) {
processedHooks.remove(hook.getId());
//reset fail count for hook
if (hook.getRetryPolicyRecord().isFailed()) {
RetryPolicyRecord record = hook.getRetryPolicyRecord();
record.reset();
retryPoliciesService.update(record);
}
}
//worker should invoke this method when it is fail to send message to hookId
public void fail(Hook hook) {
processedHooks.remove(hook.getId());
log.warn("Hook: " + hook.getId() + " failed.");
retryPoliciesService.getRetryPolicyByType(hook.getRetryPolicyType())
.onFail(hook.getRetryPolicyRecord());
}
private Map<Long, List<Task>> getScheduledTasks(Collection<Long> excludeHooksIds) {
return taskDao.getScheduled(excludeHooksIds);
}
private List<Hook> loadHooks(Collection<Long> hookIds) {
List<Hook> hooksWaitingMessages = hookDao.getWithPolicies(hookIds);
return retryPoliciesService.filter(hooksWaitingMessages);
}
private Set<Long> getMessageIdsFilteredByHooks(Map<Long, List<Task>> scheduledTasks, Collection<Long> liveHookIds) {
final Set<Long> messageIds = new HashSet<>();
for (long hookId : liveHookIds) {
for (Task t : scheduledTasks.get(hookId)) {
messageIds.add(t.getMessageId());
}
}
return messageIds;
}
private int numberOfReadyTasks(Map<Long, List<Task>> tasks, Collection<Long> liveHookIds){
int count = 0;
for(long hookId: liveHookIds){
count += tasks.get(hookId).size();
}
return count;
}
private Map<Long, Message> loadMessages(Collection<Long> messageIds) {
List<Message> messages = messageDao.getBy(messageIds);
Map<Long, Message> map = new HashMap<>();
for(Message message: messages){
map.put(message.getId(), message);
}
return map;
}
}

View File

@ -1,62 +0,0 @@
package com.rbkmoney.hooker.scheduler.invoicing;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.dao.impl.InvoicingTaskDao;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.MessageJson;
import com.rbkmoney.hooker.service.PostSender;
import com.rbkmoney.hooker.service.crypt.Signer;
import com.rbkmoney.hooker.service.err.PostRequestException;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* Created by jeckep on 18.04.17.
*/
public class MessageSender implements Runnable {
public static Logger log = LoggerFactory.getLogger(MessageSender.class);
private Hook hook;
private List<Message> messages;
private TaskDao taskDao;
private MessageScheduler workerTaskScheduler;
private Signer signer;
private PostSender postSender;
public MessageSender(Hook hook, List<Message> messages, InvoicingTaskDao taskDao, MessageScheduler workerTaskScheduler, Signer signer, PostSender postSender) {
this.hook = hook;
this.messages = messages;
this.taskDao = taskDao;
this.workerTaskScheduler = workerTaskScheduler;
this.signer = signer;
this.postSender = postSender;
}
@Override
public void run() {
try {
for (Message message : messages) {
final String messageJson = MessageJson.buildMessageJson(message);
final String signature = signer.sign(messageJson, hook.getPrivKey());
int statusCode = postSender.doPost(hook.getUrl(), messageJson, signature);
if (statusCode != HttpStatus.SC_OK) {
log.warn("Wrong status code {} from merchant, but we don't try to resend it. MessageId {}, invoiceId {}", statusCode, message.getId(), message.getInvoice().getId());
//TODO RESTORE IT
// throw new PostRequestException("Internal server error for message id = " + message.getId());
} else {
log.info("{} is sent to {}", message, hook);
}
taskDao.remove(hook.getId(), message.getId()); //required after message is sent
}
workerTaskScheduler.done(hook); // required after all messages processed
} catch (Exception e) {
log.warn("Couldn't send message to hook {}. We'll try to resend it", hook.toString(), e);
workerTaskScheduler.fail(hook); // required if fail to send message
}
}
}

View File

@ -1,7 +1,7 @@
package com.rbkmoney.hooker.service;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,7 +13,7 @@ public class EventService {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
@Autowired
CustomerDao customerDao;

View File

@ -29,12 +29,12 @@ public class PostSender {
RequestBody body = RequestBody.create(JSON, paramsAsString);
final Request request = new Request.Builder()
.url(url)
.addHeader(SIGNATURE_HEADER, "alg=RS256; digest="+signature)
.addHeader(SIGNATURE_HEADER, "alg=RS256; digest=" + signature)
.post(body)
.build();
Response response = httpClient.newCall(request).execute();
log.info("Response from hook: code {}; body: {}", +response.code(), response.body().string());
log.info("Response from hook: code {}; body: {}", response.code(), response.body().string());
return response.code();
}
}

View File

@ -3,6 +3,7 @@ package com.rbkmoney.hooker.utils;
import com.rbkmoney.damsel.webhooker.*;
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.swag_webhook_events.Event;
import org.apache.thrift.meta_data.StructMetaData;
import java.util.Arrays;
@ -170,4 +171,14 @@ public class EventFilterUtils {
}
return eventTypeCodeSet;
}
public static String getTopic(EventFilter eventFilter) {
if (eventFilter.isSetInvoice()) {
return Event.TopicEnum.INVOICESTOPIC.getValue();
}
if (eventFilter.isSetCustomer()) {
return Event.TopicEnum.CUSTOMERSTOPIC.getValue();
}
throw new UnsupportedOperationException("Unknown topic; must be one of these: "+Arrays.toString(Event.TopicEnum.values()));
}
}

View File

@ -25,17 +25,19 @@ public class HookConverter {
return new Hook(
webhook.getId(),
webhook.getPartyId(),
EventFilterUtils.getTopic(webhook.getEventFilter()),
EventFilterUtils.getWebhookAdditionalFilter(webhook.getEventFilter()),
webhook.getUrl(),
webhook.getPubKey(),
null,
webhook.isEnabled(),
null, null);
null);
}
public static Hook convert(WebhookParams webhookParams){
Hook hook = new Hook();
hook.setPartyId(webhookParams.getPartyId());
hook.setTopic(EventFilterUtils.getTopic(webhookParams.getEventFilter()));
hook.setUrl(webhookParams.getUrl());
hook.setFilters(EventFilterUtils.getWebhookAdditionalFilter(webhookParams.getEventFilter()));

View File

@ -27,7 +27,7 @@ flyway.schemas=hook
message.scheduler.delay=1000
message.sender.number=10
spring.cache.cache-names=messagesByInvoice,messagesById,hooks
spring.cache.cache-names=messagesByInvoice,messagesById,queues
spring.cache.caffeine.spec=maximumSize=1000,expireAfterWrite=60s

View File

@ -0,0 +1,52 @@
-- clean table
delete from hook.simple_retry_policy;
delete from hook.scheduled_task;
-- drop constraint
ALTER TABLE hook.simple_retry_policy DROP CONSTRAINT simple_retry_policy_pkey;
-- drop column
ALTER TABLE hook.simple_retry_policy DROP COLUMN hook_id;
-- create invoicing_queue table
CREATE TABLE hook.invoicing_queue
(
id bigserial NOT NULL,
hook_id bigint NOT NULL,
invoice_id CHARACTER VARYING NOT NULL,
enabled boolean NOT NULL DEFAULT true,
CONSTRAINT invoicing_queue_pkey PRIMARY KEY (id),
CONSTRAINT invoicing_queue_pkey2 UNIQUE (hook_id, invoice_id)
);
-- create customer_queue table
CREATE TABLE hook.customer_queue
(
id bigserial NOT NULL,
hook_id bigint NOT NULL,
customer_id CHARACTER VARYING NOT NULL,
enabled boolean NOT NULL DEFAULT true,
CONSTRAINT customer_queue_pkey PRIMARY KEY (id),
CONSTRAINT customer_queue_pkey2 UNIQUE (hook_id, customer_id)
);
-- add queue_id column
ALTER TABLE hook.simple_retry_policy ADD COLUMN queue_id bigint NOT NULL;
ALTER TABLE hook.simple_retry_policy ADD COLUMN message_type hook.message_topic;
ALTER TABLE hook.simple_retry_policy ADD CONSTRAINT simple_retry_policy_pkey PRIMARY KEY (queue_id, message_type);
-- replace hook_id to queue_id
ALTER TABLE hook.scheduled_task DROP CONSTRAINT scheduled_task_pkey;
ALTER TABLE hook.scheduled_task DROP CONSTRAINT scheduled_task_fkey2;
ALTER TABLE hook.scheduled_task DROP COLUMN hook_id;
ALTER TABLE hook.scheduled_task ADD COLUMN queue_id bigint NOT NULL;
ALTER TABLE hook.scheduled_task ADD CONSTRAINT scheduled_task_pkey PRIMARY KEY (message_id, queue_id, message_type);
-- create indices
CREATE INDEX IF NOT EXISTS message_invoice_id_idx ON hook.message USING btree(invoice_id);
CREATE INDEX IF NOT EXISTS customer_message_customer_id_idx ON hook.customer_message USING btree(customer_id);
-- add column topic to webhook
ALTER TABLE hook.webhook ADD COLUMN topic hook.message_topic;
UPDATE hook.webhook w SET topic='InvoicesTopic' where exists (select * from hook.webhook_to_events wh where wh.hook_id = w.id AND wh.event_type in ('INVOICE_CREATED', 'INVOICE_STATUS_CHANGED', 'INVOICE_PAYMENT_STARTED', 'INVOICE_PAYMENT_STATUS_CHANGED'));
UPDATE hook.webhook w SET topic='CustomersTopic' where exists (select * from hook.webhook_to_events wh where wh.hook_id = w.id AND wh.event_type in ('CUSTOMER_CREATED', 'CUSTOMER_DELETED', 'CUSTOMER_READY', 'CUSTOMER_BINDING_STARTED', 'CUSTOMER_BINDING_SUCCEEDED', 'CUSTOMER_BINDING_FAILED'));

View File

@ -1,13 +1,14 @@
package com.rbkmoney.hooker;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.hooker.utils.EventFilterUtils;
import com.rbkmoney.swag_webhook_events.Event;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
@ -29,14 +30,14 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static com.rbkmoney.hooker.utils.BuildUtils.message;
import static com.rbkmoney.hooker.utils.BuildUtils.buildMessage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Created by jeckep on 20.04.17.
*/
@TestPropertySource(properties = {"message.scheduler.delay=100"})
@TestPropertySource(properties = {"message.scheduler.delay=500"})
public class ComplexDataflowTest extends AbstractIntegrationTest {
private static Logger log = LoggerFactory.getLogger(ComplexDataflowTest.class);
@ -44,13 +45,10 @@ public class ComplexDataflowTest extends AbstractIntegrationTest {
HookDao hookDao;
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
@Autowired
SimpleRetryPolicyDao simpleRetryPolicyDao;
BlockingQueue<DataflowTest.MockMessage> hook1Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<DataflowTest.MockMessage> hook2Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<DataflowTest.MockMessage> inv1Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<DataflowTest.MockMessage> inv2Queue = new LinkedBlockingDeque<>(10);
final List<Hook> hooks = new ArrayList<>();
final String HOOK_1 = "/hook1";
@ -62,7 +60,7 @@ public class ComplexDataflowTest extends AbstractIntegrationTest {
@Before
public void setUp() throws Exception {
//start mock web server
//create hooks
//createWithPolicy hooks
if (baseServerUrl == null) {
baseServerUrl = webserver(dispatcher());
log.info("Mock server url: " + baseServerUrl);
@ -85,27 +83,36 @@ public class ComplexDataflowTest extends AbstractIntegrationTest {
@Test
public void testMessageSend() throws InterruptedException {
List<Message> sourceMessages = new ArrayList<>();
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"1", "partyId1", EventType.INVOICE_STATUS_CHANGED, "unpaid")));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.PAYMENT,"1", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "captured")));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.PAYMENT,"1", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "processed")));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.PAYMENT,"1", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "failed")));
List<InvoicingMessage> sourceMessages = new ArrayList<>();
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE,"1", "partyId1", EventType.INVOICE_STATUS_CHANGED, "unpaid");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "captured");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "2", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "processed");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "2", "partyId1", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "failed");
messageDao.create(message);
sourceMessages.add(message);
List<DataflowTest.MockMessage> hooks = new ArrayList<>();
hooks.add(hook1Queue.poll(1, TimeUnit.SECONDS));
hooks.add(hook1Queue.poll(1, TimeUnit.SECONDS));
hooks.add(hook2Queue.poll(1, TimeUnit.SECONDS));
hooks.add(inv1Queue.poll(1, TimeUnit.SECONDS));
hooks.add(inv1Queue.poll(1, TimeUnit.SECONDS));
hooks.add(inv2Queue.poll(1, TimeUnit.SECONDS));
Assert.assertNotNull(hooks.get(0));
Assert.assertNotNull(hooks.get(1));
Assert.assertNotNull(hooks.get(2));
assertEquals(sourceMessages.get(0).getInvoice().getStatus(), hooks.get(0).getInvoice().getStatus());
assertEquals(sourceMessages.get(1).getPayment().getStatus(), hooks.get(1).getPayment().getStatus());
assertEquals(sourceMessages.get(3).getPayment().getStatus(), hooks.get(2).getPayment().getStatus());
assertTrue(hook1Queue.isEmpty());
assertTrue(hook2Queue.isEmpty());
assertTrue(inv1Queue.isEmpty());
assertTrue(inv2Queue.isEmpty());
Thread.currentThread().sleep(1000);
@ -116,6 +123,7 @@ public class ComplexDataflowTest extends AbstractIntegrationTest {
hook.setPartyId(partyId);
hook.setUrl(url);
hook.setFilters(webhookAdditionalFilters);
hook.setTopic(Event.TopicEnum.INVOICESTOPIC.getValue());
return hook;
}
@ -124,18 +132,19 @@ public class ComplexDataflowTest extends AbstractIntegrationTest {
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
if (request.getPath().startsWith(HOOK_1)) {
hook1Queue.put(DataflowTest.extractPaymentResourcePayer(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_1).setResponseCode(200);
}
if (request.getPath().startsWith(HOOK_2)) {
hook2Queue.put(DataflowTest.extractPaymentResourcePayer(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_2).setResponseCode(200);
}
return new MockResponse().setResponseCode(500);
DataflowTest.MockMessage mockMessage = DataflowTest.extractPaymentResourcePayer(request);
String id = mockMessage.getInvoice().getId();
if (id.equals("1")) {
inv1Queue.put(mockMessage);
} else if (id.equals("2")) {
inv2Queue.put(mockMessage);
} else {
Thread.sleep(100);
return new MockResponse().setResponseCode(500);
}
Thread.sleep(100);
return new MockResponse().setBody(HOOK_1).setResponseCode(200);
}
};
return dispatcher;

View File

@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.dao.CustomerDao;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
import com.rbkmoney.hooker.handler.poller.impl.customer.AbstractCustomerEventHandler;
import com.rbkmoney.hooker.model.CustomerMessage;
@ -12,6 +11,7 @@ import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.utils.BuildUtils;
import com.rbkmoney.swag_webhook_events.Customer;
import com.rbkmoney.swag_webhook_events.Event;
import com.rbkmoney.swag_webhook_events.PaymentToolDetailsBankCard;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
@ -41,7 +41,7 @@ import static org.junit.Assert.assertTrue;
/**
* Created by jeckep on 20.04.17.
*/
@TestPropertySource(properties = {"message.scheduler.delay=100"})
@TestPropertySource(properties = {"message.scheduler.delay=500"})
public class CustomerDataflowTest extends AbstractIntegrationTest {
private static Logger log = LoggerFactory.getLogger(CustomerDataflowTest.class);
@ -51,19 +51,14 @@ public class CustomerDataflowTest extends AbstractIntegrationTest {
@Autowired
CustomerDao customerDao;
@Autowired
SimpleRetryPolicyDao simpleRetryPolicyDao;
BlockingQueue<MockMessage> hook1Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> hook2Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> hook3Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> hookBrokenQueue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> cust1Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> cust2Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> cust3Queue = new LinkedBlockingDeque<>(10);
final List<Hook> hooks = new ArrayList<>();
final String HOOK_1 = "/hook1";
final String HOOK_2 = "/hook2";
final String HOOK_3 = "/hook3";
final String BROKEN_HOOK = "/brokenhook";
String baseServerUrl;
@ -71,7 +66,7 @@ public class CustomerDataflowTest extends AbstractIntegrationTest {
@Before
public void setUp() throws Exception {
//start mock web server
//create hooks
//createWithPolicy hooks
if (baseServerUrl == null) {
baseServerUrl = webserver(dispatcher());
log.info("Mock server url: " + baseServerUrl);
@ -86,39 +81,55 @@ public class CustomerDataflowTest extends AbstractIntegrationTest {
@Test
public void testMessageSend() throws InterruptedException {
List<CustomerMessage> sourceMessages = new ArrayList<>();
sourceMessages.add(customerDao.create(BuildUtils.buildCustomerMessage(1L, "partyId1", EventType.CUSTOMER_CREATED, AbstractCustomerEventHandler.CUSTOMER, "1234", "2342", Customer.StatusEnum.READY)));
sourceMessages.add(customerDao.create(BuildUtils.buildCustomerMessage(2L, "partyId1", EventType.CUSTOMER_READY, AbstractCustomerEventHandler.CUSTOMER, "1234", "2342", Customer.StatusEnum.READY)));
sourceMessages.add(customerDao.create(BuildUtils.buildCustomerMessage(3L, "partyId2", EventType.CUSTOMER_CREATED, AbstractCustomerEventHandler.CUSTOMER, "666", "2342", Customer.StatusEnum.READY)));
sourceMessages.add(customerDao.create(BuildUtils.buildCustomerMessage(4L, "partyId2", EventType.CUSTOMER_READY, AbstractCustomerEventHandler.CUSTOMER, "6666", "2342", Customer.StatusEnum.READY)));
sourceMessages.add(customerDao.create(BuildUtils.buildCustomerMessage(5L, "partyId2", EventType.CUSTOMER_BINDING_STARTED, AbstractCustomerEventHandler.BINDING, "6666", "2342", Customer.StatusEnum.READY)));
sourceMessages.add(customerDao.create(BuildUtils.buildCustomerMessage(6L, "partyId2", EventType.CUSTOMER_BINDING_SUCCEEDED, AbstractCustomerEventHandler.BINDING, "4444", "2342", Customer.StatusEnum.READY)));
CustomerMessage message = BuildUtils.buildCustomerMessage(1L, "partyId1", EventType.CUSTOMER_CREATED, AbstractCustomerEventHandler.CUSTOMER, "1", "2342", Customer.StatusEnum.READY);
customerDao.create(message);
sourceMessages.add(message);
message = BuildUtils.buildCustomerMessage(2L, "partyId1", EventType.CUSTOMER_READY, AbstractCustomerEventHandler.CUSTOMER, "1", "2342", Customer.StatusEnum.READY);
customerDao.create(message);
sourceMessages.add(message);
message = BuildUtils.buildCustomerMessage(3L, "partyId2", EventType.CUSTOMER_CREATED, AbstractCustomerEventHandler.CUSTOMER, "2", "2342", Customer.StatusEnum.READY);
customerDao.create(message);
sourceMessages.add(message);
message = BuildUtils.buildCustomerMessage(4L, "partyId2", EventType.CUSTOMER_READY, AbstractCustomerEventHandler.CUSTOMER, "2", "2342", Customer.StatusEnum.READY);
customerDao.create(message);
sourceMessages.add(message);
message = BuildUtils.buildCustomerMessage(5L, "partyId2", EventType.CUSTOMER_BINDING_STARTED, AbstractCustomerEventHandler.BINDING, "2", "2342", Customer.StatusEnum.READY);
customerDao.create(message);
sourceMessages.add(message);
message = BuildUtils.buildCustomerMessage(6L, "partyId2", EventType.CUSTOMER_BINDING_SUCCEEDED, AbstractCustomerEventHandler.BINDING, "3", "2342", Customer.StatusEnum.READY);
customerDao.create(message);
sourceMessages.add(message);
List<MockMessage> hook1 = new ArrayList<>();
List<MockMessage> hook2 = new ArrayList<>();
List<MockMessage> hook3 = new ArrayList<>();
List<MockMessage> cust1 = new ArrayList<>();
List<MockMessage> cust2 = new ArrayList<>();
List<MockMessage> cust3 = new ArrayList<>();
for (int i = 0; i < 1; i++) {
hook1.add(hook1Queue.poll(1, TimeUnit.SECONDS));
for (int i = 0; i < 3; i++) {
cust1.add(cust1Queue.poll(1, TimeUnit.SECONDS));
}
Assert.assertNotNull(hook1.get(0));
assertEquals(sourceMessages.get(0).getEventId(), hook1.get(0).getEventID());
for (int i = 0; i < 2; i++) {
hook2.add(hook2Queue.poll(1, TimeUnit.SECONDS));
}
Assert.assertEquals(hook2.size(), 2);
assertEquals(sourceMessages.get(0).getEventId(), hook2.get(0).getEventID());
assertEquals(sourceMessages.get(1).getEventId(), hook2.get(1).getEventID());
for (int i = 0; i < 4; i++) {
hook3.add(hook3Queue.poll(1, TimeUnit.SECONDS));
for (int i = 0; i < 3; i++) {
Assert.assertNotNull(cust1.get(i));
}
assertEquals(hook3.size(), 4);
assertTrue(hook1Queue.isEmpty());
assertTrue(hook2Queue.isEmpty());
assertTrue(hook3Queue.isEmpty());
for (int i = 0; i < 3; i++) {
cust2.add(cust2Queue.poll(1, TimeUnit.SECONDS));
}
for (int i = 0; i < 3; i++) {
Assert.assertNotNull(cust2.get(i));
}
assertEquals(sourceMessages.get(2).getEventId(), cust2.get(0).getEventID());
assertEquals(sourceMessages.get(3).getEventId(), cust2.get(1).getEventID());
assertEquals(sourceMessages.get(4).getEventId(), cust2.get(2).getEventID());
cust3.add(cust3Queue.poll(1, TimeUnit.SECONDS));
Assert.assertNotNull(cust3.get(0));
assertEquals(sourceMessages.get(5).getEventId(), cust3.get(0).getEventID());
assertTrue(cust1Queue.isEmpty());
assertTrue(cust2Queue.isEmpty());
assertTrue(cust3Queue.isEmpty());
Thread.currentThread().sleep(1000);
@ -127,6 +138,7 @@ public class CustomerDataflowTest extends AbstractIntegrationTest {
private static Hook hook(String partyId, String url, EventType... types) {
Hook hook = new Hook();
hook.setPartyId(partyId);
hook.setTopic(Event.TopicEnum.CUSTOMERSTOPIC.getValue());
hook.setUrl(url);
Set<WebhookAdditionalFilter> webhookAdditionalFilters = new HashSet<>();
@ -143,28 +155,25 @@ public class CustomerDataflowTest extends AbstractIntegrationTest {
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
if (request.getPath().startsWith(HOOK_1)) {
hook1Queue.put(extract(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_1).setResponseCode(200);
}
if (request.getPath().startsWith(HOOK_2)) {
hook2Queue.put(extract(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_2).setResponseCode(200);
}
if (request.getPath().startsWith(HOOK_3)) {
hook3Queue.put(extract(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_3).setResponseCode(200);
}
if (request.getPath().startsWith(BROKEN_HOOK)) {
hookBrokenQueue.put(extract(request));
Thread.sleep(100);
return new MockResponse().setBody(BROKEN_HOOK).setResponseCode(500);
MockMessage mockMessage = extract(request);
String customerId = mockMessage.getCustomer().getId();
switch (customerId) {
case "1":
cust1Queue.put(mockMessage);
break;
case "2":
cust2Queue.put(mockMessage);
break;
case "3":
cust3Queue.put(mockMessage);
break;
default:
Thread.sleep(100);
return new MockResponse().setResponseCode(500);
}
return new MockResponse().setResponseCode(500);
Thread.sleep(100);
return new MockResponse().setBody(HOOK_1).setResponseCode(200);
}
};
return dispatcher;

View File

@ -3,13 +3,12 @@ package com.rbkmoney.hooker;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.MessageDao;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.dao.InvoicingMessageDao;
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
import com.rbkmoney.hooker.model.*;
import com.rbkmoney.hooker.retry.impl.simple.SimpleRetryPolicyRecord;
import com.rbkmoney.swag_webhook_events.CustomerPayer;
import com.rbkmoney.swag_webhook_events.Event;
import com.rbkmoney.swag_webhook_events.PaymentToolDetailsBankCard;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
@ -30,14 +29,15 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static com.rbkmoney.hooker.utils.BuildUtils.buildMessage;
import static com.rbkmoney.hooker.utils.BuildUtils.cart;
import static com.rbkmoney.hooker.utils.BuildUtils.message;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Created by jeckep on 20.04.17.
*/
@TestPropertySource(properties = {"message.scheduler.delay=100"})
@TestPropertySource(properties = {"message.scheduler.delay=500"})
public class DataflowTest extends AbstractIntegrationTest {
private static Logger log = LoggerFactory.getLogger(DataflowTest.class);
@ -45,21 +45,17 @@ public class DataflowTest extends AbstractIntegrationTest {
HookDao hookDao;
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
@Autowired
SimpleRetryPolicyDao simpleRetryPolicyDao;
BlockingQueue<MockMessage> hook1Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> hook2Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> hook3Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> hookBrokenQueue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> inv1Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> inv3Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> inv4Queue = new LinkedBlockingDeque<>(10);
BlockingQueue<MockMessage> inv5Queue = new LinkedBlockingDeque<>(10);
final List<Hook> hooks = new ArrayList<>();
final String HOOK_1 = "/hook1";
final String HOOK_2 = "/hook2";
final String HOOK_3 = "/hook3";
final String BROKEN_HOOK = "/brokenhook";
String baseServerUrl;
@ -67,7 +63,7 @@ public class DataflowTest extends AbstractIntegrationTest {
@Before
public void setUp() throws Exception {
//start mock web server
//create hooks
//createWithPolicy hooks
if (baseServerUrl == null) {
baseServerUrl = webserver(dispatcher());
log.info("Mock server url: " + baseServerUrl);
@ -82,9 +78,10 @@ public class DataflowTest extends AbstractIntegrationTest {
public void testCache(){
final String invoceId = "asgsdhghdhtfugny78989";
final String partyId = new Random().nextInt() + "";
Message message1 = messageDao.create(message(AbstractInvoiceEventHandler.INVOICE, invoceId, partyId, EventType.INVOICE_CREATED, "status"));
Message message2 = messageDao.getAny(invoceId, AbstractInvoiceEventHandler.INVOICE);
Message message3 = messageDao.getAny(invoceId, AbstractInvoiceEventHandler.INVOICE);
InvoicingMessage message1 = buildMessage(AbstractInvoiceEventHandler.INVOICE, invoceId, partyId, EventType.INVOICE_CREATED, "status");
messageDao.create(message1);
InvoicingMessage message2 = messageDao.getAny(invoceId, AbstractInvoiceEventHandler.INVOICE);
InvoicingMessage message3 = messageDao.getAny(invoceId, AbstractInvoiceEventHandler.INVOICE);
assertTrue(message1 != message2);
assertTrue(message2 != message3);
assertTrue(message1 != message3);
@ -92,68 +89,65 @@ public class DataflowTest extends AbstractIntegrationTest {
@Test
public void testMessageSend() throws InterruptedException {
List<Message> sourceMessages = new ArrayList<>();
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"1", "partyId1", EventType.INVOICE_CREATED, "status", cart(), true)));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.PAYMENT,"2", "partyId1", EventType.INVOICE_PAYMENT_STARTED, "status")));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"3", "partyId1", EventType.INVOICE_CREATED, "status")));
List<InvoicingMessage> sourceMessages = new ArrayList<>();
InvoicingMessage message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "1", "partyId1", EventType.INVOICE_CREATED, "status", cart(), true);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "1", "partyId1", EventType.INVOICE_PAYMENT_STARTED, "status");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.INVOICE,"3", "partyId1", EventType.INVOICE_CREATED, "status");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "4", "qwe", EventType.INVOICE_CREATED, "status");
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.INVOICE, "5", "partyId2", EventType.INVOICE_CREATED, "status", cart(), false);
messageDao.create(message);
sourceMessages.add(message);
message = buildMessage(AbstractInvoiceEventHandler.PAYMENT, "5", "partyId2", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "status", cart(), false);
messageDao.create(message);
sourceMessages.add(message);
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"4", "qwe", EventType.INVOICE_CREATED, "status")));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"5", "qwe", EventType.INVOICE_CREATED, "status")));
sourceMessages.add(messageDao.create(message(AbstractInvoiceEventHandler.PAYMENT,"6", "partyId2", EventType.INVOICE_PAYMENT_STATUS_CHANGED, "status", cart(), false)));
List<MockMessage> hook1 = new ArrayList<>();
List<MockMessage> hook2 = new ArrayList<>();
List<MockMessage> hook3 = new ArrayList<>();
for (int i = 0; i < 2; i++) {
hook1.add(hook1Queue.poll(1, TimeUnit.SECONDS));
}
Assert.assertNotNull(hook1.get(0));
Assert.assertNotNull(hook1.get(1));
assertEquals(sourceMessages.get(0).getInvoice().getId(), hook1.get(0).getInvoice().getId());
assertEquals(sourceMessages.get(2).getInvoice().getId(), hook1.get(1).getInvoice().getId());
for (int i = 0; i < 3; i++) {
hook2.add(hook2Queue.poll(1, TimeUnit.SECONDS));
}
for (int i = 0; i < 3; i++) {
assertEquals(sourceMessages.get(i).getInvoice().getId(), hook2.get(i).getInvoice().getId());
}
for (int i = 0; i < 1; i++) {
hook3.add(hook3Queue.poll(1, TimeUnit.SECONDS));
}
assertTrue(hook3.get(0).getPayment().getPayer() instanceof CustomerPayer);
assertTrue(hook1Queue.isEmpty());
assertTrue(hook2Queue.isEmpty());
assertTrue(hook3Queue.isEmpty());
List<MockMessage> inv1 = new ArrayList<>();
List<MockMessage> inv3 = new ArrayList<>();
List<MockMessage> inv4 = new ArrayList<>();
List<MockMessage> inv5 = new ArrayList<>();
Thread.currentThread().sleep(1000);
}
for (int i = 0; i < 3; i++) {
inv1.add(inv1Queue.poll(1, TimeUnit.SECONDS));
}
Assert.assertNotNull(inv1.get(0));
Assert.assertNotNull(inv1.get(1));
Assert.assertNotNull(inv1.get(2));
@Test
public void testDisableHookPolicy() throws InterruptedException {
final String invoceId = "asgsdhghdhtfugny648";
final String partyId = new Random().nextInt() + "";
Hook hook = hookDao.create(hook(partyId, "http://" + baseServerUrl + BROKEN_HOOK, EventType.INVOICE_CREATED));
simpleRetryPolicyDao.update(new SimpleRetryPolicyRecord(hook.getId(), 4, 0));
for (int i = 0; i < 2; i++) {
inv3.add(inv3Queue.poll(1, TimeUnit.SECONDS));
}
Assert.assertNotNull(inv3.get(0));
Assert.assertNotNull(inv3.get(1));
Message message = messageDao.create(message(AbstractInvoiceEventHandler.INVOICE, invoceId, partyId, EventType.INVOICE_CREATED, "status"));
assertEquals(message.getInvoice().getId(), hookBrokenQueue.poll(1, TimeUnit.SECONDS).getInvoice().getId());
inv4.add(inv4Queue.poll(1, TimeUnit.SECONDS));
Assert.assertNull(inv4.get(0));
Thread.sleep(1000);
inv5.add(inv5Queue.poll(1, TimeUnit.SECONDS));
Assert.assertNotNull(inv5.get(0));
assertEquals(sourceMessages.get(5).getInvoice().getId(), inv5.get(0).getInvoice().getId());
assertTrue(inv5.get(0).getPayment().getPayer() instanceof CustomerPayer);
assertTrue(inv1Queue.isEmpty());
assertTrue(inv3Queue.isEmpty());
assertTrue(inv4Queue.isEmpty());
assertTrue(inv5Queue.isEmpty());
hook = hookDao.getHookById(hook.getId());
assertTrue(hook.isEnabled());
}
private static Hook hook(String partyId, String url, EventType... types) {
Hook hook = new Hook();
hook.setPartyId(partyId);
hook.setTopic(Event.TopicEnum.INVOICESTOPIC.getValue());
hook.setUrl(url);
Set<WebhookAdditionalFilter> webhookAdditionalFilters = new HashSet<>();
@ -170,28 +164,34 @@ public class DataflowTest extends AbstractIntegrationTest {
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
if (request.getPath().startsWith(HOOK_1)) {
hook1Queue.put(extractPaymentResourcePayer(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_1).setResponseCode(200);
}
if (request.getPath().startsWith(HOOK_2)) {
hook2Queue.put(extractPaymentResourcePayer(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_2).setResponseCode(200);
}
MockMessage mockMessage;
if (request.getPath().startsWith(HOOK_3)) {
hook3Queue.put(extractCustomerPayer(request));
Thread.sleep(100);
return new MockResponse().setBody(HOOK_3).setResponseCode(200);
}
if (request.getPath().startsWith(BROKEN_HOOK)) {
hookBrokenQueue.put(extractPaymentResourcePayer(request));
Thread.sleep(100);
return new MockResponse().setBody(BROKEN_HOOK).setResponseCode(500);
mockMessage = extractCustomerPayer(request);
} else {
mockMessage = extractPaymentResourcePayer(request);
}
return new MockResponse().setResponseCode(500);
String invoiceId = mockMessage.getInvoice().getId();
switch (invoiceId) {
case "1":
inv1Queue.put(mockMessage);
break;
case "3":
inv3Queue.put(mockMessage);
break;
case "4":
inv4Queue.put(mockMessage);
break;
case "5":
inv5Queue.put(mockMessage);
break;
default:
Thread.sleep(100);
return new MockResponse().setBody("FAIL").setResponseCode(500);
}
Thread.sleep(100);
return new MockResponse().setBody("OK").setResponseCode(200);
}
};
return dispatcher;

View File

@ -7,6 +7,7 @@ import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Hook;
import com.rbkmoney.hooker.utils.EventFilterUtils;
import com.rbkmoney.hooker.utils.HookConverter;
import com.rbkmoney.swag_webhook_events.Event;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -102,16 +103,11 @@ public class HookDaoImplTest extends AbstractIntegrationTest {
}
}
@Test
public void getByIds(){
List<Hook> hooks = hookDao.getWithPolicies(ids);
assertEquals(3, hooks.size());
}
public static Hook buildHook(String partyId, String url){
Hook hook = new Hook();
hook.setPartyId(partyId);
hook.setUrl(url);
hook.setTopic(Event.TopicEnum.INVOICESTOPIC.getValue());
Set<WebhookAdditionalFilter> webhookAdditionalFilters = new HashSet<>();
webhookAdditionalFilters.add(new WebhookAdditionalFilter(EventType.INVOICE_PAYMENT_STATUS_CHANGED, "34", null, "cancelled"));

View File

@ -3,7 +3,7 @@ package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Message;
import com.rbkmoney.hooker.model.InvoicingMessage;
import com.rbkmoney.swag_webhook_events.CustomerPayer;
import org.junit.Before;
import org.junit.Test;
@ -16,8 +16,8 @@ import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import static com.rbkmoney.hooker.utils.BuildUtils.buildMessage;
import static com.rbkmoney.hooker.utils.BuildUtils.cart;
import static com.rbkmoney.hooker.utils.BuildUtils.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -26,20 +26,20 @@ import static org.junit.Assert.assertTrue;
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MessageDaoImplTest extends AbstractIntegrationTest {
private static Logger log = LoggerFactory.getLogger(MessageDaoImplTest.class);
public class InvoicingMessageDaoImplTest extends AbstractIntegrationTest {
private static Logger log = LoggerFactory.getLogger(InvoicingMessageDaoImplTest.class);
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
private static boolean messagesCreated = false;
@Before
public void setUp() throws Exception {
if(!messagesCreated){
messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"1234", "56678", EventType.INVOICE_CREATED, "status"));
messageDao.create(message(AbstractInvoiceEventHandler.INVOICE,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), true));
messageDao.create(message(AbstractInvoiceEventHandler.PAYMENT,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), false));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.INVOICE,"1234", "56678", EventType.INVOICE_CREATED, "status"));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.INVOICE,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), true));
messageDao.create(buildMessage(AbstractInvoiceEventHandler.PAYMENT,"1234", "56678", EventType.INVOICE_CREATED, "status", cart(), false));
messagesCreated = true;
}
}
@ -63,13 +63,13 @@ public class MessageDaoImplTest extends AbstractIntegrationTest {
@Test
public void get() throws Exception {
Message message = messageDao.getAny("1234", AbstractInvoiceEventHandler.INVOICE);
InvoicingMessage message = messageDao.getAny("1234", AbstractInvoiceEventHandler.INVOICE);
assertEquals(message.getInvoice().getAmount(), 12235);
assertEquals(message.getInvoice().getCart().size(), 2);
assertEquals(1, messageDao.getBy(Arrays.asList(message.getId())).size());
Message payment = messageDao.getAny("1234", AbstractInvoiceEventHandler.PAYMENT);
InvoicingMessage payment = messageDao.getAny("1234", AbstractInvoiceEventHandler.PAYMENT);
assertTrue(payment.getPayment().getPayer() instanceof CustomerPayer);
}

View File

@ -1,9 +1,11 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import com.rbkmoney.hooker.dao.impl.InvoicingQueueDao;
import com.rbkmoney.hooker.dao.impl.InvoicingTaskDao;
import com.rbkmoney.hooker.handler.poller.impl.invoicing.AbstractInvoiceEventHandler;
import com.rbkmoney.hooker.model.EventType;
import com.rbkmoney.hooker.model.Task;
import com.rbkmoney.hooker.utils.BuildUtils;
import org.junit.After;
import org.junit.Before;
@ -13,6 +15,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.rbkmoney.hooker.utils.BuildUtils.cart;
import static org.junit.Assert.assertEquals;
@ -26,11 +32,14 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
@Autowired
InvoicingTaskDao taskDao;
@Autowired
InvoicingQueueDao queueDao;
@Autowired
HookDao hookDao;
@Autowired
MessageDao messageDao;
InvoicingMessageDao messageDao;
Long messageId;
Long hookId;
@ -38,7 +47,7 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
@Before
public void setUp() throws Exception {
hookId = hookDao.create(HookDaoImplTest.buildHook("partyId", "fake.url")).getId();
messageDao.create(BuildUtils.message(AbstractInvoiceEventHandler.INVOICE,"2345", "partyId", EventType.INVOICE_CREATED, "status", cart(), true));
messageDao.create(BuildUtils.buildMessage(AbstractInvoiceEventHandler.INVOICE,"2345", "partyId", EventType.INVOICE_CREATED, "status", cart(), true));
messageId = messageDao.getAny("2345", AbstractInvoiceEventHandler.INVOICE).getId();
}
@ -49,16 +58,15 @@ public class InvoicingTaskDaoTest extends AbstractIntegrationTest {
@Test
public void createDeleteGet() {
assertEquals(1, taskDao.getAll().size());
taskDao.remove(hookId, messageId);
assertEquals(0, taskDao.getAll().size());
Map<Long, List<Task>> scheduled = taskDao.getScheduled(new ArrayList<>());
assertEquals(1, scheduled.size());
taskDao.remove(scheduled.keySet().iterator().next(), messageId);
assertEquals(0, taskDao.getScheduled(new ArrayList<>()).size());
}
@Test
public void removeAll() {
assertEquals(1, taskDao.getAll().size());
taskDao.removeAll(hookId);
assertEquals(0, taskDao.getAll().size());
}
}

View File

@ -14,25 +14,25 @@ import org.junit.Test;
public class MessageJsonTest {
@Test
public void test() throws JsonProcessingException {
Message message = BuildUtils.message(AbstractInvoiceEventHandler.PAYMENT, "444", "987", EventType.INVOICE_PAYMENT_STARTED, "cancelled");
System.out.println(MessageJson.buildMessageJson(message));
Message copy = message.copy();
InvoicingMessage message = BuildUtils.buildMessage(AbstractInvoiceEventHandler.PAYMENT, "444", "987", EventType.INVOICE_PAYMENT_STARTED, "cancelled");
System.out.println(InvoicingMessageJson.buildMessageJson(message));
InvoicingMessage copy = message.copy();
message.getInvoice().setAmount(99988);
Assert.assertNotEquals(message.getInvoice().getAmount(), copy.getInvoice().getAmount());
}
@Test
public void testCart() throws JsonProcessingException {
Message message = BuildUtils.message(AbstractInvoiceEventHandler.PAYMENT, "444", "987", EventType.INVOICE_PAYMENT_STARTED, "cancelled", BuildUtils.cart(), true);
String messageJson = MessageJson.buildMessageJson(message);
InvoicingMessage message = BuildUtils.buildMessage(AbstractInvoiceEventHandler.PAYMENT, "444", "987", EventType.INVOICE_PAYMENT_STARTED, "cancelled", BuildUtils.cart(), true);
String messageJson = InvoicingMessageJson.buildMessageJson(message);
System.out.println(messageJson);
Assert.assertTrue(messageJson.contains("taxMode"));
}
@Test
public void testInvoiceCustomer() throws JsonProcessingException {
Message message = BuildUtils.message(AbstractInvoiceEventHandler.PAYMENT, "444", "987", EventType.INVOICE_PAYMENT_STARTED, "cancelled", BuildUtils.cart(), false);
String messageJson = MessageJson.buildMessageJson(message);
InvoicingMessage message = BuildUtils.buildMessage(AbstractInvoiceEventHandler.PAYMENT, "444", "987", EventType.INVOICE_PAYMENT_STARTED, "cancelled", BuildUtils.cart(), false);
String messageJson = InvoicingMessageJson.buildMessageJson(message);
System.out.println(messageJson);
Assert.assertTrue(messageJson.contains("CustomerPayer"));
}

View File

@ -13,12 +13,12 @@ import java.util.List;
* Created by jeckep on 25.04.17.
*/
public class BuildUtils {
public static Message message(String type, String invoiceId, String partyId, EventType eventType, String status) {
return message(type, invoiceId, partyId, eventType, status, null, true);
public static InvoicingMessage buildMessage(String type, String invoiceId, String partyId, EventType eventType, String status) {
return buildMessage(type, invoiceId, partyId, eventType, status, null, true);
}
public static Message message(String type, String invoiceId, String partyId, EventType eventType, String status, List<InvoiceCartPosition> cart, boolean isPayer) {
Message message = new Message();
public static InvoicingMessage buildMessage(String type, String invoiceId, String partyId, EventType eventType, String status, List<InvoiceCartPosition> cart, boolean isPayer) {
InvoicingMessage message = new InvoicingMessage();
message.setEventId(5555);
message.setEventTime("time");
message.setType(type);