mirror of
https://github.com/valitydev/hooker.git
synced 2024-11-06 08:15:17 +00:00
parent
ebdf492a69
commit
a5b1b87b92
9
pom.xml
9
pom.xml
@ -100,6 +100,14 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-cache</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<!--RBK libs-->
|
||||
<dependency>
|
||||
<groupId>com.rbkmoney.dbinit</groupId>
|
||||
@ -167,6 +175,7 @@
|
||||
<version>1.1.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -3,9 +3,11 @@ package com.rbkmoney.hooker;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.web.servlet.ServletComponentScan;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
|
||||
@ServletComponentScan
|
||||
@SpringBootApplication(scanBasePackages = {"com.rbkmoney.hooker", "com.rbkmoney.dbinit"})
|
||||
@EnableCaching
|
||||
public class HookerApplication {
|
||||
public static void main(String[] args) throws Exception {
|
||||
SpringApplication.run(HookerApplication.class, args);
|
||||
|
@ -0,0 +1,10 @@
|
||||
package com.rbkmoney.hooker.configuration;
|
||||
|
||||
/**
|
||||
* Created by jeckep on 25.04.17.
|
||||
*/
|
||||
public class CacheConfiguration {
|
||||
public static final String MESSAGES_BY_INVOICE = "messagesByInvoice";
|
||||
public static final String MESSAGES_BY_IDS = "messagesById";
|
||||
public static final String HOOKS = "hooks";
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package com.rbkmoney.hooker.dao.impl;
|
||||
|
||||
import com.rbkmoney.hooker.configuration.CacheConfiguration;
|
||||
import com.rbkmoney.hooker.dao.DaoException;
|
||||
import com.rbkmoney.hooker.dao.HookDao;
|
||||
import com.rbkmoney.hooker.dao.WebhookAdditionalFilter;
|
||||
@ -11,6 +12,8 @@ import com.rbkmoney.hooker.service.crypt.Signer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
|
||||
@ -29,6 +32,8 @@ import java.util.stream.Collectors;
|
||||
public class HookDaoImpl implements HookDao {
|
||||
Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
CacheManager cacheManager;
|
||||
|
||||
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||
|
||||
@ -140,19 +145,33 @@ public class HookDaoImpl implements HookDao {
|
||||
}
|
||||
|
||||
public List<Hook> getWithPolicies(Collection<Long> ids) {
|
||||
if (ids.size() == 0) {
|
||||
return new ArrayList<>();
|
||||
List<Hook> hooks = getFromCache(ids);
|
||||
|
||||
final Set<Long> hookIds = new HashSet<>();
|
||||
|
||||
if(hooks.size() == ids.size()){
|
||||
return hooks;
|
||||
}else{
|
||||
hookIds.addAll(ids);
|
||||
if(hooks.size() > 0){
|
||||
for(Hook hook: hooks){
|
||||
hookIds.remove(hook.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final String sql =
|
||||
" select w.*, k.*, srp.*" +
|
||||
" from hook.webhook w " +
|
||||
" join hook.party_key k on k.party_id = w.party_id " +
|
||||
" left join hook.simple_retry_policy srp on srp.hook_id = w.id" +
|
||||
" where w.id in (:ids)";
|
||||
final MapSqlParameterSource params = new MapSqlParameterSource("ids", ids);
|
||||
final MapSqlParameterSource params = new MapSqlParameterSource("ids", hookIds);
|
||||
|
||||
try {
|
||||
List<Hook> hooks = jdbcTemplate.query(sql, params, hookWithPolicyRowMapper);
|
||||
List<Hook> hooksFromDb = jdbcTemplate.query(sql, params, hookWithPolicyRowMapper);
|
||||
putToCache(hooksFromDb);
|
||||
hooks.addAll(hooksFromDb);
|
||||
return hooks;
|
||||
} catch (DataAccessException e) {
|
||||
throw new DaoException(e);
|
||||
@ -160,6 +179,8 @@ public class HookDaoImpl implements HookDao {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Hook create(Hook hook) {
|
||||
@ -297,4 +318,25 @@ public class HookDaoImpl implements HookDao {
|
||||
rs.getString("invoice_payment_status")));
|
||||
|
||||
|
||||
private List<Hook> getFromCache(Collection<Long> ids){
|
||||
Cache cache = cacheManager.getCache(CacheConfiguration.HOOKS);
|
||||
List<Hook> hooks = new ArrayList<>();
|
||||
|
||||
for(long id: ids){
|
||||
Hook hook = cache.get(id, Hook.class);
|
||||
if(hook != null){
|
||||
hooks.add(hook);
|
||||
}
|
||||
}
|
||||
|
||||
return hooks;
|
||||
}
|
||||
|
||||
private void putToCache(Collection<Hook> hooks){
|
||||
Cache cache = cacheManager.getCache(CacheConfiguration.HOOKS);
|
||||
for(Hook hook: hooks){
|
||||
cache.put(hook.getId(), hook);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,14 +1,19 @@
|
||||
package com.rbkmoney.hooker.dao.impl;
|
||||
|
||||
import com.rbkmoney.damsel.base.Content;
|
||||
import com.rbkmoney.hooker.configuration.CacheConfiguration;
|
||||
import com.rbkmoney.hooker.dao.DaoException;
|
||||
import com.rbkmoney.hooker.dao.MessageDao;
|
||||
import com.rbkmoney.hooker.dao.TaskDao;
|
||||
import com.rbkmoney.hooker.model.EventType;
|
||||
import com.rbkmoney.hooker.model.Hook;
|
||||
import com.rbkmoney.hooker.model.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.core.NestedRuntimeException;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
@ -27,6 +32,9 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
|
||||
@Autowired
|
||||
TaskDao taskDao;
|
||||
|
||||
@Autowired
|
||||
CacheManager cacheManager;
|
||||
|
||||
private static RowMapper<Message> messageRowMapper = (rs, i) -> {
|
||||
Message message = new Message();
|
||||
message.setId(rs.getLong("id"));
|
||||
@ -56,6 +64,7 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
|
||||
}
|
||||
|
||||
@Override
|
||||
@Cacheable(CacheConfiguration.MESSAGES_BY_INVOICE)
|
||||
public Message getAny(String invoiceId) throws DaoException {
|
||||
Message result = null;
|
||||
final String sql = "SELECT * FROM hook.message WHERE invoice_id =:invoice_id LIMIT 1";
|
||||
@ -68,6 +77,8 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
|
||||
log.warn("MessageDaoImpl.getAny error", e);
|
||||
throw new DaoException(e);
|
||||
}
|
||||
|
||||
putToCache(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -102,6 +113,7 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
|
||||
// create tasks
|
||||
taskDao.create(Arrays.asList(keyHolder.getKey().longValue()));
|
||||
message.setId(keyHolder.getKey().longValue());
|
||||
putToCache(message);
|
||||
return message;
|
||||
} catch (NestedRuntimeException e) {
|
||||
throw new DaoException("Couldn't create message with invoce_id "+ message.getInvoiceId(), e);
|
||||
@ -120,12 +132,25 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
|
||||
|
||||
@Override
|
||||
public List<Message> getBy(Collection<Long> messageIds) {
|
||||
if(messageIds.size() == 0){
|
||||
return new ArrayList<>();
|
||||
List<Message> messages = getFromCache(messageIds);
|
||||
|
||||
Set<Long> ids = new HashSet<>();
|
||||
if(messages.size() == messageIds.size()){
|
||||
return messages;
|
||||
}else{
|
||||
ids.addAll(messageIds);
|
||||
for(Message message: messages){
|
||||
ids.remove(message.getId());
|
||||
}
|
||||
}
|
||||
|
||||
final String sql = "SELECT * FROM hook.message WHERE id in (:ids)";
|
||||
try {
|
||||
List<Message> messages = getNamedParameterJdbcTemplate().query(sql, new MapSqlParameterSource("ids", messageIds), messageRowMapper);
|
||||
List<Message> messagesFromDb = getNamedParameterJdbcTemplate().query(sql, new MapSqlParameterSource("ids", ids), messageRowMapper);
|
||||
for(Message message: messagesFromDb){
|
||||
putToCache(message);
|
||||
}
|
||||
messages.addAll(messagesFromDb);
|
||||
return messages;
|
||||
} catch (NestedRuntimeException e) {
|
||||
log.error("MessageDaoImpl.getByIds error", e);
|
||||
@ -154,4 +179,21 @@ public class MessageDaoImpl extends NamedParameterJdbcDaoSupport implements Mess
|
||||
throw new DaoException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void putToCache(Message message){
|
||||
cacheManager.getCache(CacheConfiguration.MESSAGES_BY_IDS).put(message.getId(), message);
|
||||
cacheManager.getCache(CacheConfiguration.MESSAGES_BY_INVOICE).put(message.getInvoiceId(), message);
|
||||
}
|
||||
|
||||
private List<Message> getFromCache(Collection<Long> ids){
|
||||
Cache cache = cacheManager.getCache(CacheConfiguration.MESSAGES_BY_IDS);
|
||||
List<Message> messages = new ArrayList<>();
|
||||
for(long id: ids){
|
||||
Message message = cache.get(id, Message.class);
|
||||
if(message != null){
|
||||
messages.add(message);
|
||||
}
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ public class Message {
|
||||
public Content getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
@JsonIgnore
|
||||
|
||||
public void setMetadata(Content metadata) {
|
||||
this.metadata = metadata;
|
||||
}
|
||||
@ -153,4 +153,19 @@ public class Message {
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Message message = (Message) o;
|
||||
|
||||
return id == message.id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (int) (id ^ (id >>> 32));
|
||||
}
|
||||
}
|
||||
|
@ -26,4 +26,7 @@ spring.datasource.hikari.idle-timeout=30000
|
||||
message.scheduler.delay=500
|
||||
message.sender.number=10
|
||||
|
||||
spring.cache.cache-names=messagesByInvoice,messagesById,hooks
|
||||
spring.cache.caffeine.spec=maximumSize=1000,expireAfterWrite=60s
|
||||
|
||||
|
||||
|
@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.rbkmoney.hooker.utils.BuildUtils.message;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
@ -120,29 +121,6 @@ public class DataflowTest extends AbstractIntegrationTest {
|
||||
assertFalse(hook.isEnabled());
|
||||
}
|
||||
|
||||
private static Message message(String invoceId, String partyId, EventType type, String status) {
|
||||
Message message = new Message();
|
||||
message.setEventId(5555);
|
||||
message.setEventTime("12.12.2007");
|
||||
message.setInvoiceId(invoceId);
|
||||
message.setPartyId(partyId);
|
||||
message.setShopId(123);
|
||||
message.setAmount(12235);
|
||||
message.setCurrency("RUB");
|
||||
message.setCreatedAt("12.12.2008");
|
||||
com.rbkmoney.damsel.base.Content metadata = new com.rbkmoney.damsel.base.Content();
|
||||
metadata.setType("string");
|
||||
metadata.setData("somedata".getBytes());
|
||||
message.setMetadata(metadata);
|
||||
message.setProduct("product");
|
||||
message.setDescription("description");
|
||||
message.setEventType(type);
|
||||
message.setType("invoice");
|
||||
message.setStatus(status);
|
||||
message.setPaymentId("paymentId");
|
||||
return message;
|
||||
}
|
||||
|
||||
private static Hook hook(String partyId, String url, EventType... types) {
|
||||
Hook hook = new Hook();
|
||||
hook.setPartyId(partyId);
|
||||
|
@ -1,6 +1,5 @@
|
||||
package com.rbkmoney.hooker.dao;
|
||||
|
||||
import com.rbkmoney.damsel.base.Content;
|
||||
import com.rbkmoney.hooker.AbstractIntegrationTest;
|
||||
import com.rbkmoney.hooker.model.EventType;
|
||||
import com.rbkmoney.hooker.model.Message;
|
||||
@ -8,12 +7,15 @@ import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static com.rbkmoney.hooker.utils.BuildUtils.message;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
@ -22,14 +24,15 @@ import static org.junit.Assert.assertEquals;
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
public class MessageDaoImplTest extends AbstractIntegrationTest {
|
||||
private static Logger log = LoggerFactory.getLogger(MessageDaoImplTest.class);
|
||||
|
||||
@Autowired
|
||||
MessageDao messageDao;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
messageDao.create(buildMessage("1234", "56678"));
|
||||
messageDao.create(buildMessage("1234", "56678"));
|
||||
messageDao.create(message("1234", "56678", EventType.INVOICE_CREATED, "status"));
|
||||
messageDao.create(message("1234", "56678", EventType.INVOICE_CREATED, "status"));
|
||||
}
|
||||
|
||||
@After
|
||||
@ -37,6 +40,23 @@ public class MessageDaoImplTest extends AbstractIntegrationTest {
|
||||
messageDao.delete("1234");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAny() {
|
||||
long startTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
messageDao.getAny("1234");
|
||||
}
|
||||
|
||||
long executionTime = System.currentTimeMillis() - startTime;
|
||||
if (executionTime > 1000) {
|
||||
log.error("Execution time: " + executionTime + ".Seems caching not working!!!");
|
||||
} else {
|
||||
log.info("Execution time: " + executionTime);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void get() throws Exception {
|
||||
Message message = messageDao.getAny("1234");
|
||||
@ -46,30 +66,7 @@ public class MessageDaoImplTest extends AbstractIntegrationTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getMaxEventId(){
|
||||
public void getMaxEventId() {
|
||||
assertEquals(messageDao.getMaxEventId().longValue(), 5555);
|
||||
}
|
||||
|
||||
public static Message buildMessage(String invoceId, String partyId){
|
||||
Message message = new Message();
|
||||
message.setEventId(5555);
|
||||
message.setEventTime("12.12.2007");
|
||||
message.setInvoiceId(invoceId);
|
||||
message.setPartyId(partyId);
|
||||
message.setShopId(123);
|
||||
message.setAmount(12235);
|
||||
message.setCurrency("RUB");
|
||||
message.setCreatedAt("12.12.2008");
|
||||
Content metadata = new Content();
|
||||
metadata.setType("string");
|
||||
metadata.setData("somedata".getBytes());
|
||||
message.setMetadata(metadata);
|
||||
message.setProduct("product");
|
||||
message.setDescription("description");
|
||||
message.setEventType(EventType.INVOICE_CREATED);
|
||||
message.setType("invoice");
|
||||
message.setStatus("message status");
|
||||
message.setPaymentId("paymentId");
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package com.rbkmoney.hooker.dao;
|
||||
|
||||
import com.rbkmoney.hooker.AbstractIntegrationTest;
|
||||
import com.rbkmoney.hooker.model.EventType;
|
||||
import com.rbkmoney.hooker.utils.BuildUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -9,8 +11,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
@ -34,9 +34,9 @@ public class TaskDaoImplTest extends AbstractIntegrationTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
hookId = hookDao.create(HookDaoImplTest.buildHook("partyId234", "fake.url")).getId();
|
||||
messageDao.create(MessageDaoImplTest.buildMessage("2345qweasd","partyId234"));
|
||||
messageId = messageDao.getAny("2345qweasd").getId();
|
||||
hookId = hookDao.create(HookDaoImplTest.buildHook("partyId", "fake.url")).getId();
|
||||
messageDao.create(BuildUtils.message("2345", "partyId", EventType.INVOICE_CREATED, "status"));
|
||||
messageId = messageDao.getAny("2345").getId();
|
||||
}
|
||||
|
||||
@After
|
||||
|
32
src/test/java/com/rbkmoney/hooker/utils/BuildUtils.java
Normal file
32
src/test/java/com/rbkmoney/hooker/utils/BuildUtils.java
Normal file
@ -0,0 +1,32 @@
|
||||
package com.rbkmoney.hooker.utils;
|
||||
|
||||
import com.rbkmoney.hooker.model.EventType;
|
||||
import com.rbkmoney.hooker.model.Message;
|
||||
|
||||
/**
|
||||
* Created by jeckep on 25.04.17.
|
||||
*/
|
||||
public class BuildUtils {
|
||||
public static Message message(String invoceId, String partyId, EventType type, String status) {
|
||||
Message message = new Message();
|
||||
message.setEventId(5555);
|
||||
message.setInvoiceId(invoceId);
|
||||
message.setPartyId(partyId);
|
||||
message.setShopId(123);
|
||||
message.setAmount(12235);
|
||||
message.setCurrency("RUB");
|
||||
message.setCreatedAt("12.12.2008");
|
||||
com.rbkmoney.damsel.base.Content metadata = new com.rbkmoney.damsel.base.Content();
|
||||
metadata.setType("string");
|
||||
metadata.setData("somedata".getBytes());
|
||||
message.setMetadata(metadata);
|
||||
message.setProduct("product");
|
||||
message.setDescription("description");
|
||||
message.setEventType(type);
|
||||
message.setType("invoice");
|
||||
message.setStatus(status);
|
||||
message.setPaymentId("paymentId");
|
||||
message.setEventTime("time");
|
||||
return message;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user