* fix
remove task for disabled hooks
This commit is contained in:
Yevgeniy Poluektov 2017-04-26 15:44:58 +03:00 committed by GitHub
parent 7679cd5066
commit ebdf492a69
5 changed files with 45 additions and 18 deletions

View File

@ -13,6 +13,7 @@ import java.util.Map;
public interface TaskDao {
void create(Collection<Long> messageIds);
void remove(long hookId, long messageId);
void removeAll(long hookId);
List<Task> getAll();
Map<Long, List<Task>> getScheduled(Collection<Long> excludeHooksIds);
}

View File

@ -36,15 +36,15 @@ public class TaskDaoImpl extends NamedParameterJdbcDaoSupport implements TaskDao
}
final String sql =
" insert into hook.scheduled_task(message_id, hook_id)" +
" select m.id, w.id " +
" from hook.message m" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled = TRUE " +
" join hook.webhook_to_events wte on wte.hook_id = w.id" +
" where m.id in (:ids) " +
" and m.event_type = wte.event_type " +
" and (wte.invoice_shop_id is null or m.shop_id = wte.invoice_shop_id) " +
" and (m.status = COALESCE(wte.invoice_status, wte.invoice_payment_status) or (wte.invoice_status is null and wte.invoice_payment_status is null))" +
" ON CONFLICT (message_id, hook_id) DO NOTHING";
" select m.id, w.id " +
" from hook.message m" +
" join hook.webhook w on m.party_id = w.party_id and w.enabled" +
" join hook.webhook_to_events wte on wte.hook_id = w.id" +
" where m.id in (:ids) " +
" and m.event_type = wte.event_type " +
" and (wte.invoice_shop_id is null or m.shop_id = wte.invoice_shop_id) " +
" and (m.status = COALESCE(wte.invoice_status, wte.invoice_payment_status) or (wte.invoice_status is null and wte.invoice_payment_status is null))" +
" ON CONFLICT (message_id, hook_id) DO NOTHING";
try {
int updateCount = getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("ids", messageIds));
log.info("Created tasks count : " + updateCount);
@ -66,6 +66,17 @@ public class TaskDaoImpl extends NamedParameterJdbcDaoSupport implements TaskDao
}
}
@Override
public void removeAll(long hookId) {
final String sql = "DELETE FROM hook.scheduled_task where hook_id=:hook_id";
try {
getNamedParameterJdbcTemplate().update(sql, new MapSqlParameterSource("hook_id", hookId));
} catch (DataAccessException e) {
log.error("Fail to delete tasks for hook:" + hookId, e);
throw new DaoException(e);
}
}
@Override
public List<Task> getAll() {
final String sql = "SELECT * FROM hook.scheduled_task";
@ -84,10 +95,9 @@ public class TaskDaoImpl extends NamedParameterJdbcDaoSupport implements TaskDao
public Map<Long, List<Task>> getScheduled(Collection<Long> excludeHooksIds) {
final String sql =
" SELECT DISTINCT * " +
" FROM hook.scheduled_task st" +
" JOIN hook.webhook w on w.id = st.hook_id and w.enabled = :enabled" +
(excludeHooksIds.size() > 0 ? " WHERE st.hook_id not in (:hook_ids)" : "") +
" ORDER BY hook_id ASC , message_id ASC";
" FROM hook.scheduled_task st" +
(excludeHooksIds.size() > 0 ? " WHERE st.hook_id not in (:hook_ids)" : "") +
" ORDER BY hook_id ASC , message_id ASC";
try {
List<Task> tasks = getNamedParameterJdbcTemplate().query(
sql,

View File

@ -2,6 +2,7 @@ package com.rbkmoney.hooker.retry.impl.simple;
import com.rbkmoney.hooker.dao.HookDao;
import com.rbkmoney.hooker.dao.SimpleRetryPolicyDao;
import com.rbkmoney.hooker.dao.TaskDao;
import com.rbkmoney.hooker.retry.RetryPolicy;
import com.rbkmoney.hooker.retry.RetryPolicyType;
import org.slf4j.Logger;
@ -23,6 +24,9 @@ public class SimpleRetryPolicy implements RetryPolicy<SimpleRetryPolicyRecord> {
@Autowired
HookDao hookDao;
@Autowired
TaskDao taskDao;
private long[] delays = {30, 300, 900, 3600}; //in seconds
@Override
@ -38,6 +42,7 @@ public class SimpleRetryPolicy implements RetryPolicy<SimpleRetryPolicyRecord> {
if (rp.getFailCount() >= delays.length) {
hookDao.disable(rp.getHookId());
taskDao.removeAll(rp.getHookId());
log.warn("Hook: " + rp.getHookId() + " was disabled according to retry policy.");
}
}

View File

@ -98,6 +98,7 @@ CREATE TABLE hook.simple_retry_policy
CONSTRAINT simple_retry_policy_pkey PRIMARY KEY (hook_id)
);
CREATE INDEX hook_idx ON hook.webhook(party_id, id) WHERE enabled;
COMMENT ON TABLE hook.message
IS 'Table for saving messages for POST';

View File

@ -1,6 +1,7 @@
package com.rbkmoney.hooker.dao;
import com.rbkmoney.hooker.AbstractIntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -33,20 +34,29 @@ public class TaskDaoImplTest extends AbstractIntegrationTest {
@Before
public void setUp() throws Exception {
messageDao.create(MessageDaoImplTest.buildMessage("2345","partyId"));
messageId = messageDao.getAny("2345").getId();
hookId = hookDao.create(HookDaoImplTest.buildHook("partyId234", "fake.url")).getId();
messageDao.create(MessageDaoImplTest.buildMessage("2345qweasd","partyId234"));
messageId = messageDao.getAny("2345qweasd").getId();
}
hookId = hookDao.create(HookDaoImplTest.buildHook("partyId", "fake.url")).getId();
@After
public void after() throws Exception {
hookDao.delete(hookId);
messageDao.delete(messageId);
}
@Test
public void createDeleteGet() {
taskDao.create(Arrays.asList(messageId));
assertEquals(1, taskDao.getAll().size());
taskDao.remove(hookId, messageId);
assertEquals(0, taskDao.getAll().size());
}
@Test
public void removeAll() {
assertEquals(1, taskDao.getAll().size());
taskDao.removeAll(hookId);
assertEquals(0, taskDao.getAll().size());
}
}