Added creation time to migration

This commit is contained in:
NPospolita 2020-08-03 14:38:43 +03:00
parent dc27b055fc
commit 5a8f3bf065

View File

@ -1,6 +1,7 @@
package com.rbkmoney.shumway.replicator.service.replication; package com.rbkmoney.shumway.replicator.service.replication;
import com.rbkmoney.damsel.shumaich.*; import com.rbkmoney.damsel.shumaich.*;
import com.rbkmoney.geck.common.util.TypeUtil;
import com.rbkmoney.shumway.replicator.dao.ProgressDAO; import com.rbkmoney.shumway.replicator.dao.ProgressDAO;
import com.rbkmoney.shumway.replicator.dao.ShumwayDAO; import com.rbkmoney.shumway.replicator.dao.ShumwayDAO;
import com.rbkmoney.shumway.replicator.domain.PostingLog; import com.rbkmoney.shumway.replicator.domain.PostingLog;
@ -14,7 +15,7 @@ import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static com.rbkmoney.shumway.replicator.domain.PostingOperation.HOLD; import static com.rbkmoney.shumway.replicator.domain.PostingOperation.*;
import static com.rbkmoney.shumway.replicator.service.replication.ReplicatorService.executeCommand; import static com.rbkmoney.shumway.replicator.service.replication.ReplicatorService.executeCommand;
@Slf4j @Slf4j
@ -42,6 +43,7 @@ public class PostingReplicatorThread implements Runnable {
Map<String, ReplicationPoint> lastPlanPoints = new HashMap<>(); Map<String, ReplicationPoint> lastPlanPoints = new HashMap<>();
TreeMap<Long, ReplicationPoint> points = new TreeMap<>(); TreeMap<Long, ReplicationPoint> points = new TreeMap<>();
Map<String, Clock> clocks = new HashMap<>(); Map<String, Clock> clocks = new HashMap<>();
Map<String, String> times = new HashMap<>();
private static class ReplicationPoint { private static class ReplicationPoint {
final PostingOperation operation; final PostingOperation operation;
@ -139,6 +141,7 @@ public class PostingReplicatorThread implements Runnable {
prevNoData = false; prevNoData = false;
lastPlanPoint.postings.add(convertToProto(postingLog)); lastPlanPoint.postings.add(convertToProto(postingLog));
times.put(getTimesKey(postingLog), TypeUtil.temporalToString(postingLog.getCreationTime()));
lastPlanPoint.lastBatchId = postingLog.getBatchId(); lastPlanPoint.lastBatchId = postingLog.getBatchId();
lastPostingReplicatedId.set(postingLog.getId()); lastPostingReplicatedId.set(postingLog.getId());
progressDAO.saveProgess(lastPostingReplicatedId.get()); progressDAO.saveProgess(lastPostingReplicatedId.get());
@ -216,23 +219,29 @@ public class PostingReplicatorThread implements Runnable {
void processHold(ReplicationPoint point) throws Exception { void processHold(ReplicationPoint point) throws Exception {
PostingPlanChange postingPlanChange = new PostingPlanChange(point.planId, new PostingBatch(point.lastBatchId, point.batches.get(0).getPostings())); PostingPlanChange postingPlanChange = new PostingPlanChange(point.planId, new PostingBatch(point.lastBatchId, point.batches.get(0).getPostings()));
postingPlanChange.setCreationTime(times.get(getTimesKey(point.planId, point.batches.get(0).getId(), HOLD.getKey())));
log.info("Hold: {}", postingPlanChange); log.info("Hold: {}", postingPlanChange);
final Clock clock = executeCommand(() -> client.hold(postingPlanChange, null), postingPlanChange, remoteBackoffTime); final Clock clock = executeCommand(() -> client.hold(postingPlanChange, null), postingPlanChange, remoteBackoffTime);
clocks.put(planAndBatchId(point), clock); clocks.put(planAndBatchId(point), clock);
times.remove(getTimesKey(point.planId, point.batches.get(0).getId(), HOLD.getKey()));
} }
void processCommit(ReplicationPoint point) throws Exception { void processCommit(ReplicationPoint point) throws Exception {
PostingPlan postingPlan = new PostingPlan(point.planId, point.batches); PostingPlan postingPlan = new PostingPlan(point.planId, point.batches);
postingPlan.setCreationTime(times.get(getTimesKey(point.planId, point.batches.get(0).getId(), COMMIT.getKey())));
log.info("Commit: {}", postingPlan); log.info("Commit: {}", postingPlan);
final Clock clock = clocks.remove(planAndBatchId(point)); final Clock clock = clocks.remove(planAndBatchId(point));
executeCommand(() -> client.commitPlan(postingPlan, clock), postingPlan, remoteBackoffTime); executeCommand(() -> client.commitPlan(postingPlan, clock), postingPlan, remoteBackoffTime);
times.remove(getTimesKey(point.planId, point.batches.get(0).getId(), COMMIT.getKey()));
} }
void processRollback(ReplicationPoint point) throws Exception { void processRollback(ReplicationPoint point) throws Exception {
PostingPlan postingPlan = new PostingPlan(point.planId, point.batches); PostingPlan postingPlan = new PostingPlan(point.planId, point.batches);
postingPlan.setCreationTime(times.get(getTimesKey(point.planId, point.batches.get(0).getId(), ROLLBACK.getKey())));
log.info("Rollback: {}", postingPlan); log.info("Rollback: {}", postingPlan);
final Clock clock = clocks.remove(planAndBatchId(point)); final Clock clock = clocks.remove(planAndBatchId(point));
executeCommand(() -> client.rollbackPlan(postingPlan, clock), postingPlan, remoteBackoffTime); executeCommand(() -> client.rollbackPlan(postingPlan, clock), postingPlan, remoteBackoffTime);
times.remove(getTimesKey(point.planId, point.batches.get(0).getId(), ROLLBACK.getKey()));
} }
boolean isSameBatch(PostingLog postingLog, Long lastBatchId) { boolean isSameBatch(PostingLog postingLog, Long lastBatchId) {
@ -250,4 +259,12 @@ public class PostingReplicatorThread implements Runnable {
private String planAndBatchId(ReplicationPoint point) { private String planAndBatchId(ReplicationPoint point) {
return String.format("%s_%d", point.planId, point.lastBatchId); return String.format("%s_%d", point.planId, point.lastBatchId);
} }
private String getTimesKey(PostingLog postingLog) {
return getTimesKey(postingLog.getPlanId(),postingLog.getBatchId(), postingLog.getOperation().getKey());
}
private String getTimesKey(String planId, Long batchId, String operation) {
return planId + "_" + batchId + "_" + operation;
}
} }