TECHD-3: Fix TotalValue calculation (#24)

This commit is contained in:
Baikov Dmitrii 2024-10-25 15:58:20 +03:00 committed by GitHub
parent b201b272a9
commit ebe67e383c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 235 additions and 124 deletions

View File

@ -0,0 +1,39 @@
package com.empayre.liminator.converter;
import com.empayre.liminator.model.CurrentLimitValue;
import dev.vality.liminator.LimitResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.convert.converter.Converter;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class CurrentLimitValuesToLimitResponsesConverter
implements Converter<List<CurrentLimitValue>, List<LimitResponse>> {
@Override
public List<LimitResponse> convert(List<CurrentLimitValue> values) {
if (CollectionUtils.isEmpty(values)) {
log.info("Received LimitValues array is empty");
return new ArrayList<>();
}
return values.stream()
.map(this::toLimitResponse)
.toList();
}
private LimitResponse toLimitResponse(CurrentLimitValue value) {
return new LimitResponse()
.setLimitId(value.getLimitId())
.setLimitName(value.getLimitName())
.setCommitValue(value.getCommitValue())
.setTotalValue(value.getHoldValue() + value.getCommitValue());
}
}

View File

@ -1,41 +0,0 @@
package com.empayre.liminator.converter;
import com.empayre.liminator.model.LimitValue;
import dev.vality.liminator.LimitResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.convert.converter.Converter;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.groupingBy;
@Slf4j
@Component
public class LimitValuesByLimitNameToLimitResponseConverter
implements Converter<Map.Entry<String, List<LimitValue>>, LimitResponse> {
@Override
public LimitResponse convert(Map.Entry<String, List<LimitValue>> source) {
LimitResponse limitResponse = new LimitResponse();
limitResponse.setLimitName(source.getKey());
limitResponse.setLimitId(source.getValue().get(0).getLimitId());
long commitValue = 0L;
long totalValue = 0L;
for (LimitValue limitValue : source.getValue()) {
switch (limitValue.getState()) {
case HOLD -> totalValue = totalValue + limitValue.getOperationValue();
case COMMIT -> {
commitValue = commitValue + limitValue.getOperationValue();
totalValue = totalValue - limitValue.getOperationValue();
}
case ROLLBACK -> totalValue = totalValue - limitValue.getOperationValue();
default -> throw new IllegalStateException("Unexpected value: " + limitValue.getState());
}
}
limitResponse.setCommitValue(commitValue);
limitResponse.setTotalValue(totalValue);
return limitResponse;
}
}

View File

@ -1,36 +0,0 @@
package com.empayre.liminator.converter;
import com.empayre.liminator.model.LimitValue;
import dev.vality.liminator.LimitResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.convert.converter.Converter;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.groupingBy;
@Slf4j
@Component
@RequiredArgsConstructor
public class LimitValuesToLimitResponsesConverter implements Converter<List<LimitValue>, List<LimitResponse>> {
private final Converter<Map.Entry<String, List<LimitValue>>, LimitResponse> limitResponseConverter;
@Override
public List<LimitResponse> convert(List<LimitValue> values) {
if (CollectionUtils.isEmpty(values)) {
log.info("Received LimitValues array is empty");
return new ArrayList<>();
}
Map<String, List<LimitValue>> valuesPerLimitName = values.stream()
.collect(groupingBy(LimitValue::getLimitName));
return valuesPerLimitName.entrySet().stream()
.map(limitResponseConverter::convert)
.toList();
}
}

View File

@ -2,6 +2,7 @@ package com.empayre.liminator.dao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.CurrentLimitValue;
import com.empayre.liminator.model.LimitValue;
import java.util.Collection;
@ -16,4 +17,8 @@ public interface OperationStateHistoryDao extends CommonDao<OperationStateHistor
List<LimitValue> getLimitHistory(List<String> limitNames, String operationId);
List<OperationStateHistory> get(String operationId, Collection<Long> limitIds, List<OperationState> states);
List<CurrentLimitValue> getCurrentValues(List<String> limitNames);
List<CurrentLimitValue> getCurrentValues(List<String> limitNames, String operationId);
}

View File

@ -3,11 +3,12 @@ package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.CurrentLimitValue;
import com.empayre.liminator.model.LimitValue;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.impl.DSL;
import org.springframework.stereotype.Component;
import java.util.Collection;
@ -85,4 +86,71 @@ public class OperationStateHistoryDaoImpl implements OperationStateHistoryDao {
.and(OPERATION_STATE_HISTORY.STATE.in(states))
.fetchInto(OperationStateHistory.class);
}
@Override
public List<CurrentLimitValue> getCurrentValues(List<String> limitNames) {
return getCurrentValues(limitNames, null);
}
@Override
public List<CurrentLimitValue> getCurrentValues(List<String> limitNames, String operationId) {
var holdOps = OPERATION_STATE_HISTORY.as("hold_ops");
var commitOps = OPERATION_STATE_HISTORY.as("commit_ops");
var rollbackOps = OPERATION_STATE_HISTORY.as("rollback_ops");
var createdAtSelect = select(OPERATION_STATE_HISTORY.CREATED_AT)
.from(OPERATION_STATE_HISTORY)
.where(OPERATION_STATE_HISTORY.OPERATION_ID.eq(operationId))
.orderBy(OPERATION_STATE_HISTORY.CREATED_AT.desc())
.limit(1);
var query = dslContext
.select(
LIMIT_DATA.LIMIT_ID,
LIMIT_DATA.NAME,
DSL.sum(DSL.coalesce(holdOps.OPERATION_VALUE, 0)
.minus(DSL.coalesce(commitOps.OPERATION_VALUE, 0))
.minus(DSL.coalesce(rollbackOps.OPERATION_VALUE, 0)).cast(Long.class)),
DSL.sum(DSL.coalesce(commitOps.OPERATION_VALUE, 0).cast(Long.class))
)
.from(
LIMIT_DATA
.leftJoin(holdOps)
.on(
LIMIT_DATA.ID.eq(holdOps.LIMIT_DATA_ID)
.and(holdOps.STATE.eq(OperationState.HOLD))
.and(operationId == null
? DSL.trueCondition()
: holdOps.CREATED_AT.le(createdAtSelect))
)
.leftJoin(commitOps)
.on(
commitOps.OPERATION_ID.eq(holdOps.OPERATION_ID)
.and(commitOps.STATE.in(OperationState.COMMIT))
.and(operationId == null
? DSL.trueCondition()
: commitOps.CREATED_AT.le(createdAtSelect))
)
.leftJoin(rollbackOps)
.on(
rollbackOps.OPERATION_ID.eq(holdOps.OPERATION_ID)
.and(rollbackOps.STATE.in(OperationState.ROLLBACK))
.and(operationId == null
? DSL.trueCondition()
: rollbackOps.CREATED_AT.le(createdAtSelect))
)
)
.where(LIMIT_DATA.NAME.in(limitNames))
.groupBy(LIMIT_DATA.LIMIT_ID, LIMIT_DATA.NAME);
return query
.fetch()
.stream()
.map(record ->
new CurrentLimitValue(
record.value1(),
record.value2(),
record.value3().longValue(),
record.value4().longValue()
)
)
.toList();
}
}

View File

@ -1,7 +1,7 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.model.LimitValue;
import com.empayre.liminator.model.CurrentLimitValue;
import com.empayre.liminator.service.LimitOperationsHistoryService;
import dev.vality.liminator.LimitResponse;
import lombok.RequiredArgsConstructor;
@ -20,13 +20,13 @@ import java.util.List;
public class GetLastLimitsValuesHandler implements Handler<List<String>, List<LimitResponse>> {
private final LimitOperationsHistoryService limitOperationsHistoryService;
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
private final Converter<List<CurrentLimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponsesConverter;
@Transactional
@Override
public List<LimitResponse> handle(List<String> limitIdNames) throws TException {
List<LimitValue> currentLimitValues = limitOperationsHistoryService.getCurrentLimitValue(limitIdNames);
List<CurrentLimitValue> currentLimitValues = limitOperationsHistoryService.getCurrentLimitValue(limitIdNames);
log.debug("Success get last limits: {}", Arrays.toString(currentLimitValues.toArray()));
return currentLimitValuesToLimitResponseConverter.convert(currentLimitValues);
return currentLimitValuesToLimitResponsesConverter.convert(currentLimitValues);
}
}

View File

@ -1,7 +1,7 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.model.LimitValue;
import com.empayre.liminator.model.CurrentLimitValue;
import com.empayre.liminator.service.LimitOperationsHistoryService;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest;
@ -22,7 +22,7 @@ import java.util.List;
public class GetLimitsValuesHandler implements Handler<LimitRequest, List<LimitResponse>> {
private final LimitOperationsHistoryService limitOperationsHistoryService;
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
private final Converter<List<CurrentLimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponsesConverter;
@Transactional
@Override
@ -31,8 +31,9 @@ public class GetLimitsValuesHandler implements Handler<LimitRequest, List<LimitR
.map(LimitChange::getLimitName)
.toList();
String operationId = request.getOperationId();
List<LimitValue> limitValues = limitOperationsHistoryService.getCurrentLimitValue(limitNames, operationId);
List<CurrentLimitValue> limitValues =
limitOperationsHistoryService.getCurrentLimitValue(limitNames, operationId);
log.debug("Success get limits: {}", Arrays.toString(limitValues.toArray()));
return currentLimitValuesToLimitResponseConverter.convert(limitValues);
return currentLimitValuesToLimitResponsesConverter.convert(limitValues);
}
}

View File

@ -0,0 +1,14 @@
package com.empayre.liminator.model;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class CurrentLimitValue {
private String limitId;
private String limitName;
private Long holdValue;
private Long commitValue;
}

View File

@ -4,7 +4,7 @@ import com.empayre.liminator.converter.OperationStateHistoryConverter;
import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.LimitValue;
import com.empayre.liminator.model.CurrentLimitValue;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.OperationNotFound;
import lombok.RequiredArgsConstructor;
@ -33,12 +33,12 @@ public class LimitOperationsHistoryService {
return operationStateHistoryDao.saveBatch(operationsHistory);
}
public List<LimitValue> getCurrentLimitValue(List<String> limitNames) {
return operationStateHistoryDao.getLimitHistory(limitNames);
public List<CurrentLimitValue> getCurrentLimitValue(List<String> limitNames) {
return operationStateHistoryDao.getCurrentValues(limitNames);
}
public List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId) {
return operationStateHistoryDao.getLimitHistory(limitNames, operationId);
public List<CurrentLimitValue> getCurrentLimitValue(List<String> limitNames, String operationId) {
return operationStateHistoryDao.getCurrentValues(limitNames, operationId);
}
public List<OperationStateHistory> get(String operationId,
@ -53,7 +53,8 @@ public class LimitOperationsHistoryService {
String logPrefix = state.getLiteral();
String operationId = request.getOperationId();
List<OperationState> operationFinalStates = List.of(COMMIT, ROLLBACK);
var existedFinalOperations = operationStateHistoryDao.get(operationId, limitIds, operationFinalStates);
var existedFinalOperations =
operationStateHistoryDao.get(operationId, limitIds, operationFinalStates);
if (limitIds.size() == existedFinalOperations.size()) {
log.error("[{}] Existed hold operations with ID {} not found (request: {})",
logPrefix, operationId, request);

View File

@ -5,6 +5,7 @@ import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.LimitContext;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.CurrentLimitValue;
import com.empayre.liminator.model.LimitValue;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
@ -16,6 +17,7 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -133,6 +135,72 @@ class DaoTests {
assertEquals(currentLimitValue.size(), operationsWithHold.size());
}
// total = hold1 + hold2 + hold3 + commit4 + commit5
// -> received commit for hold2
// total = hold1 + hold3 + commit4 + commit5 + commit2
@Test
void getCurrentValuesTest() {
var limitsMap = new HashMap<String, Long>();
for (int i = 0; i < 1; i++) {
String limitName = "Limit-CV-" + i;
String limitId = "Limit-CV-ID-" + i;
Long id = limitDataDao.save(new LimitData(null, limitName, LocalDate.now(), LocalDateTime.now(), limitId));
limitsMap.put(limitName, id);
}
Map<String, List<OperationStateHistory>> holdOperationsMap = new HashMap<>();
String operationNameTemplate = "%s-operation-%s";
for (String limitName : limitsMap.keySet()) {
List<OperationStateHistory> holdOperations = new ArrayList<>();
for (int i = 0; i < 5; i++) {
OperationStateHistory operationHistory = createOperationHistory(
limitName,
limitsMap.get(limitName),
operationNameTemplate.formatted(limitName, i));
holdOperations.add(operationHistory);
operationStateHistoryDao.save(operationHistory);
}
holdOperationsMap.put(limitName, holdOperations);
}
OperationStateHistory firstCommitOp = null;
for (String limitName : holdOperationsMap.keySet()) {
List<OperationStateHistory> holdOperations = holdOperationsMap.get(limitName);
for (OperationStateHistory holdOperation : holdOperations.subList(0, 2)) {
var commitOperation = createOperationHistory(
limitName,
limitsMap.get(limitName),
holdOperation.getOperationId(),
LocalDateTime.now(),
OperationState.COMMIT);
if (firstCommitOp == null) {
firstCommitOp = commitOperation;
}
operationStateHistoryDao.save(commitOperation);
}
}
List<String> limitNames = holdOperationsMap.keySet().stream().toList();
List<CurrentLimitValue> currentValues = operationStateHistoryDao.getCurrentValues(limitNames);
CurrentLimitValue currentLimitValue = currentValues.get(0);
assertEquals(200, currentLimitValue.getCommitValue());
assertEquals(300, currentLimitValue.getHoldValue());
String firstLimitName = limitNames.get(0);
List<OperationStateHistory> holds = holdOperationsMap.get(firstLimitName);
OperationStateHistory holdOperation = holds.get(holds.size() - 2);
List<CurrentLimitValue> middleCurrentValues =
operationStateHistoryDao.getCurrentValues(List.of(firstLimitName), holdOperation.getOperationId());
CurrentLimitValue middleCurrentLimitValue = middleCurrentValues.get(0);
assertEquals(0, middleCurrentLimitValue.getCommitValue());
assertEquals(400, middleCurrentLimitValue.getHoldValue());
List<CurrentLimitValue> firstCommitCurrentValues =
operationStateHistoryDao.getCurrentValues(List.of(firstLimitName), firstCommitOp.getOperationId());
CurrentLimitValue firstCurrentLimitValue = firstCommitCurrentValues.get(0);
assertEquals(100, firstCurrentLimitValue.getCommitValue());
assertEquals(400, firstCurrentLimitValue.getHoldValue());
}
private OperationStateHistory createOperationHistory(String limitName, Long id, String operationId) {
return createOperationHistory(limitName, id, operationId, LocalDateTime.now(), OperationState.HOLD);
}

View File

@ -184,7 +184,7 @@ class LiminatorServiceTest {
List<LimitResponse> commitResponses = liminatorService.getLastLimitsValues(List.of(limitName));
assertEquals(1, commitResponses.size());
assertEquals(0, commitResponses.get(0).getTotalValue());
assertEquals(500, commitResponses.get(0).getTotalValue());
assertEquals(500, commitResponses.get(0).getCommitValue());
assertEquals(limitName, commitResponses.get(0).getLimitName());
assertEquals(limitId, commitResponses.get(0).getLimitId());
@ -218,52 +218,44 @@ class LiminatorServiceTest {
void complexOperationsTest() throws TException {
String limitName = "TestLimitComplex";
String operationId = "Op-112-%s";
LimitRequest firstHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(1))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
LimitRequest firstHoldRequest = createRequest(limitName, operationId.formatted(1));
liminatorService.hold(firstHoldRequest);
LimitRequest secondHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(2))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(secondHoldRequest);
liminatorService.commit(secondHoldRequest);
LimitRequest thirdHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(3))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(thirdHoldRequest);
LimitRequest fourthHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(4))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
List<LimitResponse> limitResponseAfterFourthHold = liminatorService.hold(fourthHoldRequest);
liminatorService.hold(createRequest(limitName, operationId.formatted(2)));
liminatorService.commit(createRequest(limitName, operationId.formatted(2)));
liminatorService.hold(createRequest(limitName, operationId.formatted(3)));
List<LimitResponse> limitResponseAfterFourthHold =
liminatorService.hold(createRequest(limitName, operationId.formatted(4)));
// total: 3 hold ops, 1 commit: total - 400, commit - 100
assertEquals(1, limitResponseAfterFourthHold.size());
assertEquals(300, limitResponseAfterFourthHold.get(0).getTotalValue());
assertEquals(400, limitResponseAfterFourthHold.get(0).getTotalValue());
assertEquals(100, limitResponseAfterFourthHold.get(0).getCommitValue());
assertEquals(limitName, limitResponseAfterFourthHold.get(0).getLimitName());
liminatorService.rollback(firstHoldRequest);
List<LimitResponse> limitResponses = liminatorService.getLastLimitsValues(List.of(limitName));
LimitRequest fifthHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(5))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(fifthHoldRequest);
List<LimitResponse> limitResponses = liminatorService.hold(fifthHoldRequest);
// total: 2 hold ops, 1 commit; total - 300, commit - 100
assertEquals(1, limitResponses.size());
assertEquals(300, limitResponses.get(0).getTotalValue());
assertEquals(100, limitResponses.get(0).getCommitValue());
assertEquals(limitName, limitResponses.get(0).getLimitName());
List<LimitResponse> limitResponseAfterAllForFourthHold = liminatorService.hold(fourthHoldRequest);
liminatorService.commit(createRequest(limitName, operationId.formatted(3)));
List<LimitResponse> limitResponseAfterAllForFourthHold =
liminatorService.getLastLimitsValues(List.of(limitName));
// total: 1 hold ops, 2 commit;
assertEquals(1, limitResponseAfterAllForFourthHold.size());
assertEquals(300, limitResponseAfterAllForFourthHold.get(0).getTotalValue());
assertEquals(100, limitResponseAfterAllForFourthHold.get(0).getCommitValue());
assertEquals(200, limitResponseAfterAllForFourthHold.get(0).getCommitValue());
assertEquals(limitName, limitResponseAfterAllForFourthHold.get(0).getLimitName());
}
private LimitRequest createRequest(String limitName, String operationId) {
return new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
}
}