P2p lists (#20)

* Add implementation for new method

* Fix result row

* Add cascade search

* Add p2p type lists

* Add test
This commit is contained in:
Kostya 2019-11-11 17:21:40 +03:00 committed by GitHub
parent 6c0a836655
commit 8ba6244bdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 152 additions and 73 deletions

View File

@ -24,7 +24,7 @@
<dockerfile.base.service.tag>bc95d0d6dc13c693acd2b274531a7d604b877bf3</dockerfile.base.service.tag>
<dockerfile.registry>dr2.rbkmoney.com</dockerfile.registry>
<shared.resources.version>0.3.6</shared.resources.version>
<wb.list.proto.version>1.26-99f4118</wb.list.proto.version>
<wb.list.proto.version>1.27-be77f75</wb.list.proto.version>
<kafka.streams.version>2.1.0</kafka.streams.version>
<kafka.clients.version>2.1.0</kafka.clients.version>
<kafka.common.lib.version>0.0.9</kafka.common.lib.version>

View File

@ -0,0 +1,7 @@
package com.rbkmoney.wb.list.manager.constant;
public class RowType {
public static final String P_2_P = "p2p";
}

View File

@ -0,0 +1,23 @@
package com.rbkmoney.wb.list.manager.exception;
public class UnknownRowTypeException extends RuntimeException {
public UnknownRowTypeException() {
}
public UnknownRowTypeException(String message) {
super(message);
}
public UnknownRowTypeException(String message, Throwable cause) {
super(message, cause);
}
public UnknownRowTypeException(Throwable cause) {
super(cause);
}
public UnknownRowTypeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -2,7 +2,9 @@ package com.rbkmoney.wb.list.manager.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.damsel.wb_list.*;
import com.rbkmoney.wb.list.manager.constant.RowType;
import com.rbkmoney.wb.list.manager.exception.RiakExecutionException;
import com.rbkmoney.wb.list.manager.exception.UnknownRowTypeException;
import com.rbkmoney.wb.list.manager.model.CountInfoModel;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
import com.rbkmoney.wb.list.manager.utils.KeyGenerator;
@ -23,18 +25,10 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
@Override
public boolean isExist(Row row) throws TException {
return checkExist(KeyGenerator.generateKey(row.list_type, row.list_name, row.value))
|| checkExist(KeyGenerator.generateKey(row.party_id, row.list_type, row.list_name, row.value))
|| checkExist(KeyGenerator.generateKey(row));
}
private boolean checkExist(String key) throws TException {
try {
boolean present = listRepository.get(key).isPresent();
log.info("WbListServiceHandler isExist key: {} result: {}", key, present);
return present;
} catch (RiakExecutionException e) {
log.info("WbListServiceHandler error when isExist key: {} e: ", key, e);
return getCascadeRow(row).isPresent();
} catch (RiakExecutionException | UnknownRowTypeException e) {
log.error("WbListServiceHandler error when isExist row: {} e: ", row, e);
throw new TException(e);
}
}
@ -72,7 +66,7 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
public Result getRowInfo(Row row) throws ListNotFound, TException {
log.info("WbListServiceHandler getRowInfo row: {}", row);
Optional<com.rbkmoney.wb.list.manager.model.Row> result = getCascadeRow(row);
if (result.isPresent()) {
if (result.isPresent() && row.getListType() == ListType.grey) {
log.info("WbListServiceHandler getRowInfo result: {} isPresent=true!", result);
try {
CountInfoModel countInfoModel = objectMapper.readValue(result.get().getValue(), CountInfoModel.class);
@ -90,14 +84,21 @@ public class WbListServiceHandler implements WbListServiceSrv.Iface {
}
private Optional<com.rbkmoney.wb.list.manager.model.Row> getCascadeRow(Row row) {
if (row.getListType() != ListType.grey) {
return Optional.empty();
if (row.getId().isSetPaymentId()) {
PaymentId paymentId = row.getId().getPaymentId();
return Optional.ofNullable(
listRepository.get(KeyGenerator.generateKey(row.getListType(), row.getListName(), row.getValue()))
.orElse(listRepository.get(KeyGenerator.generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId()))
.orElse(listRepository.get(KeyGenerator.generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), paymentId.getShopId()))
.orElse(null))));
} else if (row.getId().isSetP2pId()) {
P2pId p2pId = row.getId().getP2pId();
return Optional.ofNullable(
listRepository.get(KeyGenerator.generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P))
.orElse(listRepository.get(KeyGenerator.generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, p2pId.getIdentityId()))
.orElse(null)));
}
return Optional.ofNullable(
listRepository.get(KeyGenerator.generateKey(row.list_type, row.list_name, row.value))
.orElse(listRepository.get(KeyGenerator.generateKey(row.party_id, row.list_type, row.list_name, row.value))
.orElse(listRepository.get(KeyGenerator.generateKey(row))
.orElse(null))));
throw new UnknownRowTypeException();
}
}

View File

@ -1,29 +1,35 @@
package com.rbkmoney.wb.list.manager.utils;
import com.rbkmoney.damsel.wb_list.ListType;
import com.rbkmoney.damsel.wb_list.Row;
import com.rbkmoney.damsel.wb_list.P2pId;
import com.rbkmoney.damsel.wb_list.PaymentId;
import com.rbkmoney.wb.list.manager.constant.RowType;
import com.rbkmoney.wb.list.manager.exception.UnknownRowTypeException;
import org.springframework.util.StringUtils;
public class KeyGenerator {
private static final String DELIMITER = "_";
public static String generateKey(String partyId, ListType listType, String listName, String value) {
return generateKey(partyId, null, listType, listName, value);
public static String generateKey(com.rbkmoney.damsel.wb_list.Row row) {
if (row.getId().isSetPaymentId()) {
PaymentId paymentId = row.getId().getPaymentId();
return generateKey(row.getListType(), row.getListName(), row.getValue(), paymentId.getPartyId(), paymentId.getShopId());
} else if (row.getId().isSetP2pId()) {
P2pId p2pId = row.getId().getP2pId();
return generateKey(row.getListType(), row.getListName(), row.getValue(), RowType.P_2_P, p2pId.getIdentityId());
}
throw new UnknownRowTypeException();
}
public static String generateKey(ListType listType, String listName, String value) {
return generateKey(null, null, listType, listName, value);
}
public static String generateKey(Row row) {
return generateKey(row.getPartyId(), row.getShopId(), row.getListType(), row.getListName(), row.getValue());
}
private static String generateKey(String partyId, String shopId, ListType listType, String listName, String value) {
public static String generateKey(ListType listType, String listName, String value, String... params) {
StringBuilder stringBuilder = new StringBuilder();
addIfExist(partyId, stringBuilder);
return addIfExist(shopId, stringBuilder)
if (params != null) {
for (String param : params) {
addIfExist(param, stringBuilder);
}
}
return stringBuilder
.append(listType)
.append(DELIMITER)
.append(listName)

View File

@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.*;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@Slf4j
@ -44,6 +45,7 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
private static final String SHOP_ID = "shopId";
private static final String PARTY_ID = "partyId";
private static final String LIST_NAME = "listName";
public static final String IDENTITY_ID = "identityId";
@LocalServerPort
int serverPort;
@ -66,25 +68,15 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
.withNetworkTimeout(300000);
WbListServiceSrv.Iface iface = clientBuilder.build(WbListServiceSrv.Iface.class);
Producer<String, ChangeCommand> producer = createProducer();
ChangeCommand changeCommand = createCommand(createRow());
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
ChangeCommand changeCommand = produceCreateRow(createRow());
boolean exist = iface.isExist(changeCommand.getRow());
Assert.assertTrue(exist);
assertTrue(exist);
producer = createProducer();
changeCommand.setCommand(Command.DELETE);
producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
produceDeleteRow(changeCommand);
exist = iface.isExist(changeCommand.getRow());
Assert.assertFalse(exist);
assertFalse(exist);
Consumer<String, Event> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(topicEventSink));
@ -97,49 +89,86 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
eventList.add(record.value());});
consumer.close();
Assert.assertEquals(2, eventList.size());
assertEquals(2, eventList.size());
producer = createProducer();
Producer<String, ChangeCommand> producer = createProducer();
Row row = createRow();
changeCommand = createCommand(row);
row.setShopId(null);
producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
row.getId().getPaymentId()
.setShopId(null);
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
exist = iface.isExist(row);
Assert.assertTrue(exist);
assertTrue(exist);
row.getId().getPaymentId()
.setShopId(SHOP_ID);
row.setShopId(SHOP_ID);
exist = iface.isExist(row);
Assert.assertTrue(exist);
assertTrue(exist);
Result info = iface.getRowInfo(row);
Assert.assertFalse(info.isSetRowInfo());
assertFalse(info.isSetRowInfo());
row.setListType(ListType.grey);
//check without partyId and shop id
createRow(Instant.now().toString(), null, null);
RowInfo rowInfo = iface.getRowInfo(row).getRowInfo();
Assert.assertEquals(5, rowInfo.getCountInfo().getCount());
assertEquals(5, rowInfo.getCountInfo().getCount());
//check without partyId
createRow(Instant.now().toString(), null, SHOP_ID);
rowInfo = iface.getRowInfo(row).getRowInfo();
Assert.assertEquals(5, rowInfo.getCountInfo().getCount());
assertEquals(5, rowInfo.getCountInfo().getCount());
//check full key field
createRow(Instant.now().toString(), PARTY_ID, SHOP_ID);
rowInfo = iface.getRowInfo(row).getRowInfo();
Assert.assertEquals(5, rowInfo.getCountInfo().getCount());
assertEquals(5, rowInfo.getCountInfo().getCount());
rowInfo = checkCreateWithCountInfo(iface, Instant.now().toString(), PARTY_ID, SHOP_ID);
Assert.assertFalse(rowInfo.getCountInfo().getStartCountTime().isEmpty());
assertFalse(rowInfo.getCountInfo().getStartCountTime().isEmpty());
Row rowP2p = createListRow();
rowP2p.setId(IdInfo.p2p_id(new P2pId().setIdentityId(IDENTITY_ID)));
changeCommand = produceCreateRow(rowP2p);
exist = iface.isExist(changeCommand.getRow());
assertTrue(exist);
produceDeleteRow(changeCommand);
exist = iface.isExist(changeCommand.getRow());
assertFalse(exist);
}
private void produceDeleteRow(ChangeCommand changeCommand) throws InterruptedException, java.util.concurrent.ExecutionException {
Producer<String, ChangeCommand> producer = createProducer();
changeCommand.setCommand(Command.DELETE);
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
}
private ChangeCommand produceCreateRow(com.rbkmoney.damsel.wb_list.Row row) throws InterruptedException, java.util.concurrent.ExecutionException {
Producer<String, ChangeCommand> producer = createProducer();
ChangeCommand changeCommand = createCommand(row);
ProducerRecord<String, ChangeCommand> producerRecord = new ProducerRecord<>(topic, changeCommand.getRow().getValue(), changeCommand);
producer.send(producerRecord).get();
producer.close();
Thread.sleep(1000L);
return changeCommand;
}
private RowInfo checkCreateWithCountInfo(WbListServiceSrv.Iface iface, String startTimeCount, String partyId, String shopId) throws InterruptedException, java.util.concurrent.ExecutionException, TException {
Row rowWithCountInfo = createRow(startTimeCount, partyId, shopId);
return iface.getRowInfo(rowWithCountInfo).getRowInfo();
@ -170,9 +199,17 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
@NotNull
private com.rbkmoney.damsel.wb_list.Row createRow() {
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
row.setShopId(SHOP_ID);
row.setPartyId(PARTY_ID);
Row row = createListRow();
row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(SHOP_ID)
.setPartyId(PARTY_ID)
));
return row;
}
@NotNull
private Row createListRow() {
Row row = new Row();
row.setListName(LIST_NAME);
row.setListType(ListType.black);
row.setValue(VALUE);
@ -182,8 +219,10 @@ public class WbListManagerApplicationTest extends KafkaAbstractTest {
@NotNull
private com.rbkmoney.damsel.wb_list.Row createRowWithCountInfo(String startTimeCount, String partyId, String shopId) {
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
row.setShopId(shopId);
row.setPartyId(partyId);
row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(SHOP_ID)
.setPartyId(PARTY_ID)
));
row.setListName(LIST_NAME);
row.setListType(ListType.grey);
row.setValue(VALUE);

View File

@ -1,8 +1,6 @@
package com.rbkmoney.wb.list.manager;
import com.rbkmoney.damsel.wb_list.ChangeCommand;
import com.rbkmoney.damsel.wb_list.Command;
import com.rbkmoney.damsel.wb_list.ListType;
import com.rbkmoney.damsel.wb_list.*;
import com.rbkmoney.kafka.common.serialization.ThriftSerializer;
import com.rbkmoney.wb.list.manager.exception.RiakExecutionException;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
@ -100,8 +98,10 @@ public class WbListSafetyApplicationTest extends KafkaAbstractTest {
@NotNull
private com.rbkmoney.damsel.wb_list.Row createRow() {
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
row.setShopId(SHOP_ID);
row.setPartyId(PARTY_ID);
row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(SHOP_ID)
.setPartyId(PARTY_ID))
);
row.setListName(LIST_NAME);
row.setListType(ListType.black);
row.setValue(VALUE);

View File

@ -1,9 +1,10 @@
package com.rbkmoney.wb.list.manager.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rbkmoney.damsel.wb_list.IdInfo;
import com.rbkmoney.damsel.wb_list.ListType;
import com.rbkmoney.damsel.wb_list.PaymentId;
import com.rbkmoney.damsel.wb_list.Result;
import com.rbkmoney.damsel.wb_list.RowInfo;
import com.rbkmoney.wb.list.manager.model.Row;
import com.rbkmoney.wb.list.manager.repository.ListRepository;
import org.apache.thrift.TException;
@ -116,8 +117,10 @@ public class WbListServiceHandlerTest {
value.setValue(VALUE);
Mockito.when(listRepository.get(anyString())).thenReturn(Optional.of(value));
com.rbkmoney.damsel.wb_list.Row row = new com.rbkmoney.damsel.wb_list.Row();
row.setPartyId(PARTY_ID);
row.setShopId(SHOP_ID);
row.setId(IdInfo.payment_id(new PaymentId()
.setShopId(SHOP_ID)
.setPartyId(PARTY_ID)
));
row.setListType(ListType.black);
row.setListName(LIST_NAME);
row.setValue(VALUE);