Review fix, added idempotency logic

This commit is contained in:
n.pospolita 2019-09-16 12:51:32 +03:00
parent d4c6618754
commit 2c29b33f4e
8 changed files with 72 additions and 20 deletions

View File

@ -1,10 +1,10 @@
package com.rbkmoney.shumpune.service;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.Clock;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.PostingPlan;
import com.rbkmoney.damsel.shumpune.PostingPlanChange;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import com.rbkmoney.shumpune.constant.PostingOperation;
import com.rbkmoney.shumpune.converter.PostingPlanToListPostingModelListConverter;
import com.rbkmoney.shumpune.converter.PostingPlanToPostingPlanModelConverter;
@ -52,6 +52,7 @@ public class PostingPlanServiceImpl implements PostingPlanService {
Map<Long, List<PostingModel>> postingLogs = planDao.getPostingLogs(postingPlanChange.getId(), PostingOperation.HOLD);
if (postingLogs.containsKey(postingPlanChange.getBatch().getId())) {
postingsUpdateValidator.validate(List.of(postingPlanChange.getBatch()), postingLogs);
log.info("This is duplicate request (HOLD), postingPlanChange: {}", postingPlanChange);
return Clock.vector(VectorClockSerializer.serialize(
planDao.selectMaxClock(postingPlanChange.getId(), postingPlanChange.getBatch().getId())));
@ -136,17 +137,23 @@ public class PostingPlanServiceImpl implements PostingPlanService {
throw new InvalidRequest(Collections.singletonList(String.format("Hold OPERATION not found for plan: %s", postingPlan.getId())));
}
if (containsFinalOps(postingModels)) {
Map<Long, List<PostingModel>> postingLogsHolds = postingModels.stream()
.filter(postingModel -> postingModel.getOperation().equals(PostingOperation.HOLD))
.collect(Collectors.groupingBy(PostingModel::getBatchId));
Map<Long, List<PostingModel>> postingLogsFinal = postingModels.stream()
.filter(postingModel -> !postingModel.getOperation().equals(PostingOperation.HOLD))
.collect(Collectors.groupingBy(PostingModel::getBatchId));
if (!postingLogsFinal.isEmpty()) {
postingsUpdateValidator.validate(postingPlan.getBatchList(), postingLogsFinal);
log.info("This is duplicate request ({}), postingPlan: {}", postingOperation, postingPlan);
PostingModel postingModel = postingModels.stream().max(Comparator.comparingLong(PostingModel::getBatchId))
.orElseThrow(); //never happens, but sonarqube complains
return Clock.vector(VectorClockSerializer.serialize(planDao.selectMaxClock(postingPlan.getId(), postingModel.getBatchId())));
}
Map<Long, List<PostingModel>> postingLogs = postingModels.stream()
.filter(postingModel -> postingModel.getOperation().equals(PostingOperation.HOLD))
.collect(Collectors.groupingBy(PostingModel::getBatchId));
postingsUpdateValidator.validate(postingPlan, postingLogs);
postingsUpdateValidator.validate(postingPlan.getBatchList(), postingLogsHolds);
long clock = planDao.insertPostings(postingPlanToListPostingModelListConverter.convert(postingPlan, postingOperation));

View File

@ -1,8 +1,8 @@
package com.rbkmoney.shumpune.validator;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.PostingPlan;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

View File

@ -1,11 +1,11 @@
package com.rbkmoney.shumpune.validator;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.Account;
import com.rbkmoney.damsel.shumpune.InvalidPostingParams;
import com.rbkmoney.damsel.shumpune.Posting;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import com.rbkmoney.shumpune.dao.AccountDao;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,10 +1,9 @@
package com.rbkmoney.shumpune.validator;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.Posting;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.PostingPlan;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import com.rbkmoney.shumpune.domain.PostingModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -26,9 +25,8 @@ public class PostingsUpdateValidator {
private static final String POSTING_IN_BATCH_INVALID = "Posting in batch id: %d invalid";
private static final String POSTING_BATCH_SIZE_IS_INCORRECT = "Posting in batch id: %d size: %d but in old size: %d";
public void validate(PostingPlan receivedPostingPlan, Map<Long, List<PostingModel>> savedBatches) throws TException {
List<PostingBatch> newBatchList = receivedPostingPlan.getBatchList();
Set<Long> receivedBatchIds = newBatchList.stream()
public void validate(List<PostingBatch> receivedBatchList, Map<Long, List<PostingModel>> savedBatches) throws TException {
Set<Long> receivedBatchIds = receivedBatchList.stream()
.map(PostingBatch::getId)
.collect(Collectors.toSet());
@ -39,7 +37,7 @@ public class PostingsUpdateValidator {
throw new InvalidRequest(Collections.singletonList(String.format(POSTING_BATCH_ID_VIOLATION, minReceivedBatch, maxSavedBatch)));
}
for (PostingBatch receivedBatch : newBatchList) {
for (PostingBatch receivedBatch : receivedBatchList) {
List<PostingModel> savedBatch = savedBatches.get(receivedBatch.getId());
if (savedBatch != null) {
validateBatchSize(receivedBatch, savedBatch);

View File

@ -69,6 +69,53 @@ public class ShumpuneServiceHandlerTest extends DaoTestBase {
checkMinAvailable(providerAcc, -300000L, merchantAcc, -9000L, systemAcc, -6000L, clock);
}
@Test(expected = TException.class)
public void holdInvalidRequest() throws TException {
Instant now = Instant.now();
//simple save
AccountPrototype accountPrototype = AccountGenerator.createAccountPrototype(now);
long providerAcc = handler.createAccount(accountPrototype);
long merchantAcc = handler.createAccount(accountPrototype);
long systemAcc = handler.createAccount(accountPrototype);
String planHold = "plan_hold_invalid_request";
PostingPlanChange postingPlanChange = PostingGenerator.createPostingPlanChange(planHold, providerAcc, systemAcc, merchantAcc);
handler.hold(postingPlanChange);
//making plan incorrect
postingPlanChange.getBatch().getPostings().get(0).setAmount(123L);
handler.hold(postingPlanChange);
}
@Test(expected = TException.class)
public void commitInvalidRequest() throws TException {
Instant now = Instant.now();
//simple save
AccountPrototype accountPrototype = AccountGenerator.createAccountPrototype(now);
long providerAcc = handler.createAccount(accountPrototype);
long merchantAcc = handler.createAccount(accountPrototype);
long systemAcc = handler.createAccount(accountPrototype);
String planCommit = "plan_commit_invalid_postings";
PostingPlanChange postingPlanChange = PostingGenerator.createPostingPlanChange(planCommit, providerAcc, systemAcc, merchantAcc);
handler.hold(postingPlanChange);
PostingBatch batch = PostingGenerator.createBatch(providerAcc, systemAcc, merchantAcc);
ArrayList<PostingBatch> batchList = new ArrayList<>();
batchList.add(batch);
PostingPlan postingPlan = new PostingPlan()
.setId(planCommit)
.setBatchList(batchList);
handler.commitPlan(postingPlan);
//making plan incorrect
postingPlan.getBatchList().get(0).getPostings().get(0).setAmount(123L);
handler.commitPlan(postingPlan);
}
@Test
public void doubleHold() throws TException {
Instant now = Instant.now();

View File

@ -1,9 +1,9 @@
package com.rbkmoney.shumpune.validator;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.Posting;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.PostingPlan;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import org.junit.Test;
import java.util.ArrayList;

View File

@ -1,10 +1,10 @@
package com.rbkmoney.shumpune.validator;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.Account;
import com.rbkmoney.damsel.shumpune.InvalidPostingParams;
import com.rbkmoney.damsel.shumpune.Posting;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import com.rbkmoney.shumpune.dao.AccountDao;
import org.apache.thrift.TException;
import org.jetbrains.annotations.NotNull;

View File

@ -1,9 +1,9 @@
package com.rbkmoney.shumpune.validator;
import com.rbkmoney.damsel.base.InvalidRequest;
import com.rbkmoney.damsel.shumpune.Posting;
import com.rbkmoney.damsel.shumpune.PostingBatch;
import com.rbkmoney.damsel.shumpune.PostingPlan;
import com.rbkmoney.damsel.shumpune.base.InvalidRequest;
import com.rbkmoney.shumpune.domain.PostingModel;
import org.apache.thrift.TException;
import org.junit.Test;
@ -35,7 +35,7 @@ public class PostingsUpdateValidatorTest {
batchList.add(postingBatch);
batchList.add(postingBatchSecond);
receivedPostingPlan.setBatchList(batchList);
postingsUpdateValidator.validate(receivedPostingPlan, savedBatches);
postingsUpdateValidator.validate(receivedPostingPlan.getBatchList(), savedBatches);
}
@Test(expected = InvalidRequest.class)
@ -51,7 +51,7 @@ public class PostingsUpdateValidatorTest {
postingBatch.setId(idBatch);
batchList.add(postingBatch);
receivedPostingPlan.setBatchList(batchList);
postingsUpdateValidator.validate(receivedPostingPlan, savedBatches);
postingsUpdateValidator.validate(receivedPostingPlan.getBatchList(), savedBatches);
}
@Test(expected = InvalidRequest.class)
@ -81,6 +81,6 @@ public class PostingsUpdateValidatorTest {
postingBatch.setPostings(postings);
batchList.add(postingBatch);
receivedPostingPlan.setBatchList(batchList);
postingsUpdateValidator.validate(receivedPostingPlan, savedBatches);
postingsUpdateValidator.validate(receivedPostingPlan.getBatchList(), savedBatches);
}
}