add total value in limit response + change limit logic (#22)
Some checks failed
Deploy Docker Image / build-and-deploy (push) Has been cancelled

* add total value in limit response + change limit logic

* after review

* after review (2)
This commit is contained in:
Gregory 2024-10-10 21:28:32 +03:00 committed by GitHub
parent b9c1ed6374
commit db4d2ae383
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 461 additions and 259 deletions

View File

@ -173,7 +173,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>liminator-proto</artifactId>
<version>1.6-5ee237c</version>
<version>1.7-3870a39</version>
</dependency>
<dependency>
<groupId>dev.vality.woody</groupId>

View File

@ -23,9 +23,13 @@ public class CurrentLimitValuesToLimitResponseConverter implements Converter<Lis
.map(limitValue -> new LimitResponse(
limitValue.getLimitName(),
limitValue.getCommitValue(),
limitValue.getHoldValue())
getTotalValue(limitValue))
.setLimitId(limitValue.getLimitId())
)
.toList();
}
private static long getTotalValue(LimitValue limitValue) {
return limitValue.getHoldValue() + limitValue.getCommitValue() - limitValue.getRollbackValue();
}
}

View File

@ -5,8 +5,9 @@ import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import dev.vality.liminator.LimitRequest;
import java.util.List;
import java.util.Map;
public interface OperationStateHistoryConverter {
List<OperationStateHistory> convert(LimitRequest request, OperationState state);
List<OperationStateHistory> convert(LimitRequest request, OperationState state, Map<String, Long> limitNamesMap);
}

View File

@ -6,20 +6,27 @@ import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import dev.vality.liminator.LimitRequest;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@Component
public class OperationStateHistoryConverterImpl implements OperationStateHistoryConverter {
@Override
public List<OperationStateHistory> convert(LimitRequest request, OperationState state) {
public List<OperationStateHistory> convert(LimitRequest request,
OperationState state,
Map<String, Long> limitNamesMap) {
var now = LocalDateTime.now();
return request.getLimitChanges().stream()
.map(change -> {
OperationStateHistory history = new OperationStateHistory();
history.setOperationId(request.getOperationId());
history.setLimitName(change.getLimitName());
history.setLimitDataId(limitNamesMap.get(change.getLimitName()));
history.setOperationValue(change.getValue());
history.setState(state);
history.setCreatedAt(now);
return history;
})
.toList();

View File

@ -2,7 +2,6 @@ package com.empayre.liminator.dao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.model.LimitValue;
import java.util.Collection;
import java.util.List;
@ -17,10 +16,6 @@ public interface OperationDao extends CommonDao<Operation> {
List<Operation> get(String operationId, Collection<Long> limitIds, List<OperationState> states);
List<LimitValue> getCurrentLimitValue(List<String> limitNames);
List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId);
int commit(String operationId, List<Long> limitIds);
int rollback(String operationId, List<Long> limitIds);

View File

@ -1,10 +1,19 @@
package com.empayre.liminator.dao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.LimitValue;
import java.util.Collection;
import java.util.List;
public interface OperationStateHistoryDao extends CommonDao<OperationStateHistory> {
void saveBatch(List<OperationStateHistory> historyList);
int[] saveBatch(List<OperationStateHistory> historyList);
List<LimitValue> getCurrentLimitValue(List<String> limitNames);
List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId);
List<OperationStateHistory> get(String operationId, Collection<Long> limitIds, List<OperationState> states);
}

View File

@ -3,7 +3,6 @@ package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.model.LimitValue;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.Query;
@ -11,11 +10,8 @@ import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static com.empayre.liminator.domain.Tables.OPERATION;
import static org.jooq.impl.DSL.raw;
import static org.jooq.impl.DSL.val;
@Component
@RequiredArgsConstructor
@ -23,8 +19,6 @@ public class OperationDaoImpl implements OperationDao {
private final DSLContext dslContext;
private static final String DELIMITER = " ,";
@Override
public Long save(Operation operation) {
return dslContext
@ -97,74 +91,4 @@ public class OperationDaoImpl implements OperationDao {
.and(OPERATION.STATE.eq(OperationState.HOLD))
.execute();
}
@Override
public List<LimitValue> getCurrentLimitValue(List<String> limitNames) {
String sql = """
with hold_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as hold_value
from lim.limit_data as ld
left join lim.operation as ops
on ops.limit_id = ld.id and ops.state = 'HOLD'
where ld.name in ({0})
group by ld.id, ld.name
), commit_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as commit_value
from lim.limit_data as ld
left join lim.operation as ops
on ops.limit_id = ld.id and ops.state = 'COMMIT'
where ld.name in ({0})
group by ld.id, ld.name
)
select cd.limit_id, cd.name as limit_name, cd.commit_value, hd.hold_value
from commit_data as cd
join hold_data as hd on cd.id = hd.id;
""";
return dslContext
.resultQuery(sql, raw(arrayToString(limitNames)))
.fetchInto(LimitValue.class);
}
@Override
public List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId) {
String sql = """
with operation_timestamp as (
select created_at
from lim.operation
where operation_id = {0}
), hold_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as hold_value
from lim.limit_data as ld
left join lim.operation as ops
on ops.limit_id = ld.id
and ops.created_at <= (select created_at from operation_timestamp limit 1)
and ops.state = 'HOLD'
where ld.name in ({1})
group by ld.id, ld.name
), commit_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as commit_value
from lim.limit_data as ld
left join lim.operation as ops
on ops.limit_id = ld.id
and ops.created_at <= (select created_at from operation_timestamp limit 1)
and ops.state = 'COMMIT'
where ld.name in ({1})
group by ld.id, ld.name
)
select cd.limit_id, cd.name as limit_name, cd.commit_value, hd.hold_value
from commit_data as cd
join hold_data as hd on cd.id = hd.id;
""";
return dslContext
.resultQuery(sql, val(operationId), raw(arrayToString(limitNames)))
.fetchInto(LimitValue.class);
}
private static String arrayToString(List<String> strings) {
return strings.stream()
.map(limit -> "'%s'".formatted(limit))
.collect(Collectors.joining(DELIMITER));
}
}

View File

@ -1,19 +1,27 @@
package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.LimitValue;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static com.empayre.liminator.domain.Tables.OPERATION_STATE_HISTORY;
import static org.jooq.impl.DSL.raw;
import static org.jooq.impl.DSL.val;
@Component
@RequiredArgsConstructor
public class OperationStateHistoryDaoImpl implements OperationStateHistoryDao {
private static final String DELIMITER = " ,";
private final DSLContext dslContext;
@Override
@ -27,12 +35,115 @@ public class OperationStateHistoryDaoImpl implements OperationStateHistoryDao {
}
@Override
public void saveBatch(List<OperationStateHistory> historyList) {
public int[] saveBatch(List<OperationStateHistory> historyList) {
var records = historyList.stream()
.map(history -> dslContext.newRecord(OPERATION_STATE_HISTORY, history))
.toList();
dslContext
return dslContext
.batchInsert(records)
.execute();
}
@Override
public List<LimitValue> getCurrentLimitValue(List<String> limitNames) {
String sql = """
with hold_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as hold_value
from lim.limit_data as ld
left join lim.operation_state_history as ops
on ops.limit_name = ld.name
and ops.state = 'HOLD'
where ld.name in ({0})
group by ld.id, ld.name
), commit_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as commit_value
from lim.limit_data as ld
left join lim.operation_state_history as ops
on ops.limit_name = ld.name
and ops.state = 'COMMIT'
where ld.name in ({0})
group by ld.id, ld.name
), rollback_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as rollback_value
from lim.limit_data as ld
left join lim.operation_state_history as ops
on ops.limit_name = ld.name
and ops.state = 'ROLLBACK'
where ld.name in ({0})
group by ld.id, ld.name
)
select cd.limit_id, cd.name as limit_name, cd.commit_value, hd.hold_value, rd.rollback_value
from commit_data as cd
join hold_data as hd on cd.id = hd.id
join rollback_data as rd on cd.id = rd.id;
""";
return dslContext
.resultQuery(sql, raw(arrayToString(limitNames)))
.fetchInto(LimitValue.class);
}
@Override
public List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId) {
String sql = """
with operation_timestamp as (
select created_at
from lim.operation_state_history
where operation_id = {0}
), hold_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as hold_value
from lim.limit_data as ld
left join lim.operation_state_history as ops
on ops.limit_name = ld.name
and ops.created_at <= (select created_at from operation_timestamp limit 1)
and ops.state = 'HOLD'
where ld.name in ({1})
group by ld.id, ld.name
), commit_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as commit_value
from lim.limit_data as ld
left join lim.operation_state_history as ops
on ops.limit_name = ld.name
and ops.created_at <= (select created_at from operation_timestamp limit 1)
and ops.state = 'COMMIT'
where ld.name in ({1})
group by ld.id, ld.name
), rollback_data as (
select ld.id, ld.name, ld.limit_id, coalesce(sum(ops.operation_value), 0) as rollback_value
from lim.limit_data as ld
left join lim.operation_state_history as ops
on ops.limit_name = ld.name
and ops.created_at <= (select created_at from operation_timestamp limit 1)
and ops.state = 'ROLLBACK'
where ld.name in ({1})
group by ld.id, ld.name
)
select cd.limit_id, cd.name as limit_name, cd.commit_value, hd.hold_value, rd.rollback_value
from commit_data as cd
join hold_data as hd on cd.id = hd.id
join rollback_data as rd on cd.id = rd.id;
""";
return dslContext
.resultQuery(sql, val(operationId), raw(arrayToString(limitNames)))
.fetchInto(LimitValue.class);
}
@Override
public List<OperationStateHistory> get(String operationId,
Collection<Long> limitIds,
List<OperationState> states) {
return dslContext
.selectFrom(OPERATION_STATE_HISTORY)
.where(OPERATION_STATE_HISTORY.OPERATION_ID.eq(operationId))
.and(OPERATION_STATE_HISTORY.LIMIT_DATA_ID.in(limitIds))
.and(OPERATION_STATE_HISTORY.STATE.in(states))
.fetchInto(OperationStateHistory.class);
}
private static String arrayToString(List<String> strings) {
return strings.stream()
.map(limit -> "'%s'".formatted(limit))
.collect(Collectors.joining(DELIMITER));
}
}

View File

@ -1,12 +1,10 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.handler.FinalizeOperationHandler;
import com.empayre.liminator.service.LimitDataService;
import com.empayre.liminator.service.LimitOperationsLoggingService;
import com.empayre.liminator.service.LimitOperationsHistoryService;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.OperationNotFound;
import lombok.RequiredArgsConstructor;
@ -14,57 +12,41 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.empayre.liminator.domain.enums.OperationState.COMMIT;
import static com.empayre.liminator.domain.enums.OperationState.ROLLBACK;
@Slf4j
@Component
@RequiredArgsConstructor
public class FinalizeOperationHandlerImpl implements FinalizeOperationHandler {
private final OperationDao operationDao;
private final LimitDataService limitDataService;
private final LimitOperationsLoggingService limitOperationsLoggingService;
private final LimitOperationsHistoryService limitOperationsHistoryService;
@Transactional
@Override
public void handle(LimitRequest request, OperationState state) throws TException {
List<LimitData> limitData = limitDataService.get(request, state.getLiteral());
checkExistedHoldOperations(request, limitData, state);
List<Long> limitIds = limitData.stream()
.map(LimitData::getId)
.toList();
int updatedRowsCount = switch (state) {
case COMMIT -> operationDao.commit(request.getOperationId(), limitIds);
case ROLLBACK -> operationDao.rollback(request.getOperationId(), limitIds);
default -> throw new TException();
};
checkUpdatedOperationsConsistency(request, state, updatedRowsCount);
limitOperationsLoggingService.writeOperations(request, state);
Map<String, Long> limitNamesMap = getLimitDataMap(request, state.getLiteral());
limitOperationsHistoryService.checkExistedHoldOperations(request, limitNamesMap.values(), state);
if (!List.of(COMMIT, ROLLBACK).contains(state)) {
throw new TException();
}
int[] counts = limitOperationsHistoryService.writeOperations(request, state, limitNamesMap);
checkUpdatedOperationsConsistency(request, state, counts.length);
}
private void checkExistedHoldOperations(LimitRequest request,
List<LimitData> limitData,
OperationState state) throws TException {
String logPrefix = state.getLiteral();
String operationId = request.getOperationId();
List<Long> limitIds = limitData.stream()
.map(LimitData::getId)
.toList();
List<Operation> existedHoldOperations = operationDao.get(operationId, limitIds, List.of(OperationState.HOLD));
if (CollectionUtils.isEmpty(existedHoldOperations)) {
log.error("[{}] Existed hold operations with ID {} not found: {} (request: {})",
logPrefix, operationId, existedHoldOperations, request);
throw new OperationNotFound();
}
if (limitIds.size() != existedHoldOperations.size()) {
log.error("[{}] Count of existed hold operations for limits is not equal to expected (existed size: {}, " +
"expected size: {}, request: {})", logPrefix, existedHoldOperations.size(),
limitIds.size(), request);
throw new OperationNotFound();
private HashMap<String, Long> getLimitDataMap(LimitRequest request, String source) throws TException {
var limitNamesMap = new HashMap<String, Long>();
List<LimitData> limitDataList = limitDataService.get(request, source);
for (LimitData limitData : limitDataList) {
limitNamesMap.put(limitData.getName(), limitData.getId());
}
return limitNamesMap;
}
private void checkUpdatedOperationsConsistency(LimitRequest request,

View File

@ -1,8 +1,8 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.model.LimitValue;
import com.empayre.liminator.service.LimitOperationsHistoryService;
import dev.vality.liminator.LimitResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -19,13 +19,13 @@ import java.util.List;
@RequiredArgsConstructor
public class GetLastLimitsValuesHandler implements Handler<List<String>, List<LimitResponse>> {
private final OperationDao operationDao;
private final LimitOperationsHistoryService limitOperationsHistoryService;
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
@Transactional
@Override
public List<LimitResponse> handle(List<String> limitIdNames) throws TException {
List<LimitValue> currentLimitValues = operationDao.getCurrentLimitValue(limitIdNames);
List<LimitValue> currentLimitValues = limitOperationsHistoryService.getCurrentLimitValue(limitIdNames);
log.debug("Success get last limits: {}", Arrays.toString(currentLimitValues.toArray()));
return currentLimitValuesToLimitResponseConverter.convert(currentLimitValues);
}

View File

@ -1,8 +1,8 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.model.LimitValue;
import com.empayre.liminator.service.LimitOperationsHistoryService;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.LimitResponse;
@ -21,7 +21,7 @@ import java.util.List;
@RequiredArgsConstructor
public class GetLimitsValuesHandler implements Handler<LimitRequest, List<LimitResponse>> {
private final OperationDao operationDao;
private final LimitOperationsHistoryService limitOperationsHistoryService;
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
@Transactional
@ -30,7 +30,8 @@ public class GetLimitsValuesHandler implements Handler<LimitRequest, List<LimitR
List<String> limitNames = request.getLimitChanges().stream()
.map(LimitChange::getLimitName)
.toList();
List<LimitValue> limitValues = operationDao.getCurrentLimitValue(limitNames, request.getOperationId());
String operationId = request.getOperationId();
List<LimitValue> limitValues = limitOperationsHistoryService.getCurrentLimitValue(limitNames, operationId);
log.debug("Success get limits: {}", Arrays.toString(limitValues.toArray()));
return currentLimitValuesToLimitResponseConverter.convert(limitValues);
}

View File

@ -1,13 +1,10 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.converter.OperationConverter;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.handler.HoldOperationHandler;
import com.empayre.liminator.service.LimitDataService;
import com.empayre.liminator.service.LimitOperationsLoggingService;
import com.empayre.liminator.service.LimitOperationsHistoryService;
import dev.vality.liminator.DuplicateOperation;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest;
@ -20,11 +17,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@Slf4j
@Component
@ -32,14 +25,9 @@ import java.util.Map;
@RequiredArgsConstructor
public class HoldOperationHandlerImpl implements HoldOperationHandler {
private final OperationDao operationDao;
private final OperationConverter operationConverter;
private final LimitOperationsLoggingService limitOperationsLoggingService;
private final LimitOperationsHistoryService limitOperationsHistoryService;
private final LimitDataService limitDataService;
@Value("${service.skipExistedHoldOps}")
private boolean skipExistedHoldOps;
private static final String LOG_PREFIX = "HOLD";
@Transactional
@ -58,40 +46,14 @@ public class HoldOperationHandlerImpl implements HoldOperationHandler {
}
String operationId = request.getOperationId();
checkExistedFinalizeOperations(limitNamesMap, operationId);
if (!skipExistedHoldOps) {
log.debug("Skip check existed hold operation for operationId: {}", operationId);
checkExistedHoldOperations(limitNamesMap, operationId);
}
log.info("Save operation: {} with limits: {}", operationId, Arrays.toString(limitNamesMap.keySet().toArray()));
operationDao.saveBatch(convertToOperation(request, limitNamesMap));
limitOperationsLoggingService.writeOperations(request, OperationState.HOLD);
}
private List<Operation> convertToOperation(LimitRequest request, Map<String, Long> limitNamesMap) {
var createdAt = LocalDateTime.now();
return request.getLimitChanges().stream()
.map(change -> operationConverter.convert(
request,
limitNamesMap.get(change.getLimitName()),
change.getValue(),
createdAt
)
)
.toList();
}
private void checkExistedHoldOperations(Map<String, Long> limitNamesMap, String operationId) throws TException {
List<Operation> existedHoldOperations =
operationDao.get(operationId, limitNamesMap.values(), List.of(OperationState.HOLD));
if (!CollectionUtils.isEmpty(existedHoldOperations)) {
log.error("[{}] DB already has hold operation {}: {}", LOG_PREFIX, operationId, existedHoldOperations);
throw new DuplicateOperation();
}
int[] counts = limitOperationsHistoryService.writeOperations(request, OperationState.HOLD, limitNamesMap);
log.info("Success saved operation: {} with {} limits", operationId, counts.length);
}
private void checkExistedFinalizeOperations(Map<String, Long> limitNamesMap,
String operationId) throws TException {
List<Operation> existedFinalizeOperations = operationDao.get(
var existedFinalizeOperations = limitOperationsHistoryService.get(
operationId,
limitNamesMap.values(),
List.of(OperationState.COMMIT, OperationState.ROLLBACK)

View File

@ -11,4 +11,5 @@ public class LimitValue {
private String limitName;
private Long commitValue;
private Long holdValue;
private Long rollbackValue;
}

View File

@ -44,8 +44,10 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
try {
log.info("Start commit operation with request: {}", request);
finalizeOperationHandler.handle(request, OperationState.COMMIT);
log.info("Finish commit operation with request: {}", request);
} catch (Exception ex) {
log.error("Commit execution exception. Request: {}", request, ex);
throw ex;
}
}
@ -54,8 +56,10 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
try {
log.info("Start rollback operation with request: {}", request);
finalizeOperationHandler.handle(request, OperationState.ROLLBACK);
log.info("Finish rollback operation with request: {}", request);
} catch (Exception ex) {
log.error("Rollback execution exception. Request: {}", request, ex);
throw ex;
}
}

View File

@ -0,0 +1,71 @@
package com.empayre.liminator.service;
import com.empayre.liminator.converter.OperationStateHistoryConverter;
import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.LimitValue;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.OperationNotFound;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import static com.empayre.liminator.domain.enums.OperationState.*;
@Slf4j
@Service
@RequiredArgsConstructor
public class LimitOperationsHistoryService {
private final OperationStateHistoryDao operationStateHistoryDao;
private final OperationStateHistoryConverter operationStateHistoryConverter;
public int[] writeOperations(LimitRequest request,
OperationState state,
Map<String, Long> limitNamesMap) {
var operationsHistory = operationStateHistoryConverter.convert(request, state, limitNamesMap);
return operationStateHistoryDao.saveBatch(operationsHistory);
}
public List<LimitValue> getCurrentLimitValue(List<String> limitNames) {
return operationStateHistoryDao.getCurrentLimitValue(limitNames);
}
public List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId) {
return operationStateHistoryDao.getCurrentLimitValue(limitNames, operationId);
}
public List<OperationStateHistory> get(String operationId,
Collection<Long> limitIds,
List<OperationState> states) {
return operationStateHistoryDao.get(operationId, limitIds, states);
}
public void checkExistedHoldOperations(LimitRequest request,
Collection<Long> limitIds,
OperationState state) throws TException {
String logPrefix = state.getLiteral();
String operationId = request.getOperationId();
List<OperationState> operationFinalStates = List.of(COMMIT, ROLLBACK);
var existedFinalOperations = operationStateHistoryDao.get(operationId, limitIds, operationFinalStates);
if (limitIds.size() == existedFinalOperations.size()) {
log.error("[{}] Existed hold operations with ID {} not found (request: {})",
logPrefix, operationId, request);
throw new OperationNotFound();
}
var existedHoldOperations = operationStateHistoryDao.get(operationId, limitIds, List.of(HOLD));
var existedActiveHoldOperationCount = existedHoldOperations.size() - existedFinalOperations.size();
if (limitIds.size() != existedActiveHoldOperationCount) {
log.error("[{}] Count of existed hold operations for limits is not equal to expected (existed size: {}, " +
"expected size: {}, request: {})", logPrefix, existedActiveHoldOperationCount,
limitIds.size(), request);
throw new OperationNotFound();
}
}
}

View File

@ -1,26 +0,0 @@
package com.empayre.liminator.service;
import com.empayre.liminator.converter.OperationStateHistoryConverter;
import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.domain.enums.OperationState;
import dev.vality.liminator.LimitRequest;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class LimitOperationsLoggingService {
private final OperationStateHistoryDao operationStateHistoryDao;
private final OperationStateHistoryConverter operationStateHistoryConverter;
@Value("${service.logging.enabled}")
private boolean loggingEnabled;
public void writeOperations(LimitRequest request, OperationState state) {
if (loggingEnabled) {
operationStateHistoryDao.saveBatch(operationStateHistoryConverter.convert(request, state));
}
}
}

View File

@ -20,13 +20,6 @@ spring:
server:
port: "${server.port}"
service:
default:
limit: 20
logging:
enabled: true
skipExistedHoldOps: true
management:
server:
port: "${management.port}"

View File

@ -0,0 +1,3 @@
CREATE INDEX operation_history_idx ON lim.operation_state_history USING btree (limit_name, state, created_at, operation_id);
ALTER TABLE lim.operation_state_history ADD COLUMN limit_data_id bigint;

View File

@ -4,7 +4,7 @@ import com.empayre.liminator.config.PostgresqlSpringBootITest;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.LimitContext;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.model.LimitValue;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
@ -28,7 +28,7 @@ class DaoTests {
private LimitContextDao limitContextDao;
@Autowired
private OperationDao operationDao;
private OperationStateHistoryDao operationStateHistoryDao;
@Test
void limitDataDaoTest() {
@ -56,88 +56,121 @@ class DaoTests {
}
@Test
void operationDaoTest() {
List<Long> limitIdsList = new ArrayList<>();
void operationDaoHistoryTest() {
List<String> limitNamesList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String limitName = "Limit-odc-1-" + i;
String limitId = "Limit-id-odc-1-" + i;
Long id = limitDataDao.save(new LimitData(null, limitName, LocalDate.now(), LocalDateTime.now(), limitId));
limitIdsList.add(id);
limitDataDao.save(new LimitData(null, limitName, LocalDate.now(), LocalDateTime.now(), limitId));
limitNamesList.add(limitName);
}
List<Operation> operations = new ArrayList<>();
List<OperationStateHistory> operations = new ArrayList<>();
String operationNameTemplate = "Operation-odc-1-%s";
for (Long limitId : limitIdsList) {
for (String limitName : limitNamesList) {
for (int i = 0; i < 5; i++) {
operations.add(createOperation(limitId, operationNameTemplate.formatted(i)));
operations.add(createOperationHistory(limitName, operationNameTemplate.formatted(i)));
}
}
operationDao.saveBatch(operations);
operationStateHistoryDao.saveBatch(operations);
operations.clear();
List<LimitValue> currentLimitValue = operationDao.getCurrentLimitValue(limitNamesList);
assertEquals(10, currentLimitValue.size());
List<LimitValue> currentLimitValue = operationStateHistoryDao.getCurrentLimitValue(limitNamesList);
assertEquals(limitNamesList.size(), currentLimitValue.size());
currentLimitValue.forEach(value -> assertEquals(0, value.getCommitValue()));
currentLimitValue.forEach(value -> assertEquals(0, value.getRollbackValue()));
currentLimitValue.forEach(value -> assertNotEquals(0, value.getHoldValue()));
currentLimitValue.forEach(value -> assertNotEquals(0, getTotal(value)));
List<Long> commitLimitIds = limitIdsList.subList(0, 3);
List<String> commitLimitNames = limitNamesList.subList(0, 3);
String finalizeOperationName = operationNameTemplate.formatted(1);
operationDao.commit(finalizeOperationName, commitLimitIds);
for (String limitName : commitLimitNames) {
var operationHistory = createOperationHistory(
limitName,
finalizeOperationName,
LocalDateTime.now(),
OperationState.COMMIT);
operations.add(operationHistory);
}
operationStateHistoryDao.saveBatch(operations);
operations.clear();
List<Long> rollbackLimitIds = limitIdsList.subList(4, 9);
operationDao.rollback(finalizeOperationName, rollbackLimitIds);
List<String> rollbackLimitNames = limitNamesList.subList(4, 9);
for (String limitName : rollbackLimitNames) {
var operationHistory = createOperationHistory(
limitName,
finalizeOperationName,
LocalDateTime.now(),
OperationState.ROLLBACK);
operations.add(operationHistory);
}
operationStateHistoryDao.saveBatch(operations);
operations.clear();
List<LimitValue> limitValuesAfterChanges = operationDao.getCurrentLimitValue(limitNamesList);
List<LimitValue> limitValuesAfterChanges = operationStateHistoryDao.getCurrentLimitValue(limitNamesList);
List<LimitValue> limitValuesWithCommitData = limitValuesAfterChanges.stream()
.filter(value -> value.getCommitValue() == 100 && value.getHoldValue() == 400)
.filter(value -> value.getCommitValue() == 100
&& value.getHoldValue() == 500
&& value.getRollbackValue() == 0)
.toList();
assertEquals(3, limitValuesWithCommitData.size());
List<LimitValue> limitValuesAfterRollback = limitValuesAfterChanges.stream()
.filter(value -> value.getHoldValue() == 400 && value.getCommitValue() == 0)
.filter(value -> value.getHoldValue() == 500
&& value.getCommitValue() == 0
&& value.getRollbackValue() == 100)
.toList();
assertEquals(5, limitValuesAfterRollback.size());
List<LimitValue> limitValuesWithoutChanges = limitValuesAfterChanges.stream()
.filter(value -> value.getCommitValue() == 0 && value.getHoldValue() == 500)
.filter(value -> value.getHoldValue() == 500
&& value.getCommitValue() == 0
&& value.getRollbackValue() == 0)
.toList();
assertEquals(2, limitValuesWithoutChanges.size());
}
private long getTotal(LimitValue value) {
return value.getHoldValue() - value.getCommitValue() - value.getRollbackValue();
}
@Test
void operationDaoCurrentLimitWithOperationIdTest() {
String limitName = "Limit-odc-2";
String limitId = "Limit-id-odc-2";
Long id = limitDataDao.save(new LimitData(null, limitName, LocalDate.now(), LocalDateTime.now(), limitId));
List<Operation> operations = new ArrayList<>();
List<OperationStateHistory> operations = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Operation operation = createOperation(
id,
var operation = createOperationHistory(
limitName,
"Operation-odc-2-%s-%s".formatted(id, i),
LocalDateTime.now().minusMinutes(11L - i));
operationDao.save(operation);
LocalDateTime.now().minusMinutes(11L - i),
OperationState.HOLD);
operationStateHistoryDao.save(operation);
operations.add(operation);
}
List<LimitValue> valuesForFifthOperation =
operationDao.getCurrentLimitValue(List.of(limitName), operations.get(2).getOperationId());
operationStateHistoryDao.getCurrentLimitValue(List.of(limitName), operations.get(2).getOperationId());
LimitValue limitValue = valuesForFifthOperation.get(0);
assertEquals(300, limitValue.getHoldValue());
assertEquals(300, getTotal(limitValue));
valuesForFifthOperation =
operationDao.getCurrentLimitValue(List.of(limitName), operations.get(5).getOperationId());
assertEquals(600, valuesForFifthOperation.get(0).getHoldValue());
operationStateHistoryDao.getCurrentLimitValue(List.of(limitName), operations.get(5).getOperationId());
assertEquals(600, getTotal(valuesForFifthOperation.get(0)));
}
private Operation createOperation(Long limitId, String operationId) {
return createOperation(limitId, operationId, LocalDateTime.now());
private OperationStateHistory createOperationHistory(String limitName, String operationId) {
return createOperationHistory(limitName, operationId, LocalDateTime.now(), OperationState.HOLD);
}
private Operation createOperation(Long limitId, String operationId, LocalDateTime createdAt) {
Operation operation = new Operation();
operation.setLimitId(limitId);
private OperationStateHistory createOperationHistory(String limitName,
String operationId,
LocalDateTime createdAt,
OperationState state) {
OperationStateHistory operation = new OperationStateHistory();
operation.setLimitName(limitName);
operation.setOperationId(operationId);
operation.setState(OperationState.HOLD);
operation.setState(state);
operation.setOperationValue(100L);
operation.setCreatedAt(createdAt);
return operation;

View File

@ -1,9 +1,7 @@
package com.empayre.liminator.service;
import com.empayre.liminator.config.PostgresqlSpringBootITest;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.LimitResponse;
import dev.vality.liminator.*;
import org.apache.thrift.TException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@ -12,6 +10,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@PostgresqlSpringBootITest
class LiminatorServiceTest {
@ -38,10 +37,83 @@ class LiminatorServiceTest {
List<LimitResponse> response = liminatorService.hold(request);
assertEquals(limitName, response.get(0).getLimitName());
assertEquals(limitId, response.get(0).getLimitId());
assertEquals(holdValue, response.get(0).getHoldValue());
assertEquals(holdValue, response.get(0).getTotalValue());
assertEquals(0, response.get(0).getCommitValue());
}
@Test
void operationAlreadyFinaleStateTest() throws TException {
String limitName = "TestLimitCommit";
String operationId = "OpComit";
LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(new LimitChange(limitName, 500L)));
liminatorService.hold(holdRequest);
liminatorService.commit(holdRequest);
assertThrows(OperationAlreadyInFinalState.class, () -> liminatorService.hold(holdRequest));
}
@Test
void limitNotFoundTest() {
String limitName = "TestLimitCommit";
String operationId = "OpComit";
LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(new LimitChange(limitName, 500L)));
assertThrows(LimitNotFound.class, () -> liminatorService.rollback(holdRequest));
}
@Test
void operationNotFoundWithNotExistHoldTest() throws TException {
String limitName = "TestLimitCommit";
String operationId = "OpComit";
LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(new LimitChange(limitName, 500L)));
liminatorService.hold(holdRequest);
liminatorService.commit(holdRequest);
assertThrows(OperationNotFound.class, () -> liminatorService.rollback(holdRequest));
}
@Test
void operationNotFoundWithNotExpectedHoldCountTest() throws TException {
String firstLimitName = "TestLimit1";
String secondLimitName = "TestLimit2";
String operationId = "OpComit";
LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(
new LimitChange(firstLimitName, 500L),
new LimitChange(secondLimitName, 500L)
));
liminatorService.hold(holdRequest);
LimitRequest commitRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(
new LimitChange(firstLimitName, 500L)
));
liminatorService.commit(commitRequest);
LimitRequest rollbackRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(
new LimitChange(firstLimitName, 500L),
new LimitChange(secondLimitName, 500L)
));
assertThrows(OperationNotFound.class, () -> liminatorService.rollback(rollbackRequest));
}
@Test
void holdValueTest() throws TException {
String limitName = "TestLimitHold";
@ -54,7 +126,7 @@ class LiminatorServiceTest {
assertEquals(1, holdResponse.size());
LimitResponse response = holdResponse.get(0);
assertEquals(500, response.getHoldValue());
assertEquals(500, response.getTotalValue());
assertEquals(0, response.getCommitValue());
assertEquals(limitName, response.getLimitName());
}
@ -78,14 +150,14 @@ class LiminatorServiceTest {
.filter(limitResponse -> limitResponse.getLimitName().equals(limitNameFirst))
.findFirst()
.get();
assertEquals(500, limitResponseFirst.getHoldValue());
assertEquals(500, limitResponseFirst.getTotalValue());
assertEquals(0, limitResponseFirst.getCommitValue());
assertEquals(limitNameFirst, limitResponseFirst.getLimitName());
LimitResponse limitResponseSecond = holdResponse.stream()
.filter(limitResponse -> limitResponse.getLimitName().equals(limitNameSecond))
.findFirst()
.get();
assertEquals(500, limitResponseSecond.getHoldValue());
assertEquals(500, limitResponseSecond.getTotalValue());
assertEquals(0, limitResponseSecond.getCommitValue());
assertEquals(limitNameSecond, limitResponseSecond.getLimitName());
}
@ -94,18 +166,20 @@ class LiminatorServiceTest {
void commitValueTest() throws TException {
String limitName = "TestLimitCommit";
String operationId = "OpComit";
String limitId = "limit_day_id";
LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId)
.setLimitChanges(List.of(new LimitChange(limitName, 500L)));
.setLimitChanges(List.of(new LimitChange(limitName, 500L).setLimitId(limitId)));
liminatorService.hold(holdRequest);
liminatorService.commit(holdRequest);
List<LimitResponse> limitResponses = liminatorService.getLastLimitsValues(List.of(limitName));
assertEquals(1, limitResponses.size());
assertEquals(0, limitResponses.get(0).getHoldValue());
assertEquals(1000, limitResponses.get(0).getTotalValue());
assertEquals(500, limitResponses.get(0).getCommitValue());
assertEquals(limitName, limitResponses.get(0).getLimitName());
assertEquals(limitId, limitResponses.get(0).getLimitId());
}
@Test
@ -121,8 +195,61 @@ class LiminatorServiceTest {
List<LimitResponse> limitResponses = liminatorService.getLastLimitsValues(List.of(limitName));
assertEquals(1, limitResponses.size());
assertEquals(0, limitResponses.get(0).getHoldValue());
assertEquals(0, limitResponses.get(0).getTotalValue());
assertEquals(0, limitResponses.get(0).getCommitValue());
assertEquals(limitName, limitResponses.get(0).getLimitName());
}
@Test
void complexOperationsTest() throws TException {
String limitName = "TestLimitRollback";
String operationId = "Op-112-%s";
LimitRequest firstHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(1))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(firstHoldRequest);
LimitRequest secondHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(2))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(secondHoldRequest);
liminatorService.commit(secondHoldRequest);
LimitRequest thirdHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(3))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(thirdHoldRequest);
LimitRequest fourthHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(4))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
List<LimitResponse> limitResponseAfterFourthHold = liminatorService.hold(fourthHoldRequest);
assertEquals(1, limitResponseAfterFourthHold.size());
assertEquals(500, limitResponseAfterFourthHold.get(0).getTotalValue());
assertEquals(100, limitResponseAfterFourthHold.get(0).getCommitValue());
assertEquals(limitName, limitResponseAfterFourthHold.get(0).getLimitName());
liminatorService.rollback(firstHoldRequest);
LimitRequest fifthHoldRequest = new LimitRequest()
.setOperationId(operationId.formatted(4))
.setLimitChanges(List.of(new LimitChange(limitName, 100L)));
liminatorService.hold(fifthHoldRequest);
List<LimitResponse> limitResponses = liminatorService.hold(fifthHoldRequest);
assertEquals(1, limitResponses.size());
assertEquals(500, limitResponses.get(0).getTotalValue());
assertEquals(100, limitResponses.get(0).getCommitValue());
assertEquals(limitName, limitResponses.get(0).getLimitName());
List<LimitResponse> limitResponseAfterAllForFourthHold = liminatorService.hold(fourthHoldRequest);
assertEquals(1, limitResponseAfterAllForFourthHold.size());
assertEquals(500, limitResponseAfterAllForFourthHold.get(0).getTotalValue());
assertEquals(100, limitResponseAfterAllForFourthHold.get(0).getCommitValue());
assertEquals(limitName, limitResponseAfterAllForFourthHold.get(0).getLimitName());
}
}