TD-955: Add commit/rollback ops checks. Refactoring (#12)

This commit is contained in:
Baikov Dmitrii 2024-09-11 11:40:14 +03:00 committed by GitHub
parent 7b6c8e4698
commit dee5d1f4d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 144 additions and 98 deletions

View File

@ -1,8 +1,10 @@
package com.empayre.liminator.dao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.model.LimitValue;
import java.util.Collection;
import java.util.List;
public interface OperationDao extends CommonDao<Operation> {
@ -11,6 +13,10 @@ public interface OperationDao extends CommonDao<Operation> {
Operation get(Long id);
List<Operation> get(String operationId, List<OperationState> states);
List<Operation> get(String operationId, Collection<Long> limitIds, List<OperationState> states);
List<LimitValue> getCurrentLimitValue(List<String> limitNames);
List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId);

View File

@ -9,6 +9,7 @@ import com.empayre.liminator.model.LimitValue;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@ -44,6 +45,25 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
.fetchOneInto(Operation.class);
}
@Override
public List<Operation> get(String operationId, List<OperationState> states) {
return getDslContext()
.selectFrom(OPERATION)
.where(OPERATION.OPERATION_ID.eq(operationId))
.and(OPERATION.STATE.in(states))
.fetchInto(Operation.class);
}
@Override
public List<Operation> get(String operationId, Collection<Long> limitIds, List<OperationState> states) {
return getDslContext()
.selectFrom(OPERATION)
.where(OPERATION.OPERATION_ID.eq(operationId))
.and(OPERATION.LIMIT_ID.in(limitIds))
.and(OPERATION.STATE.in(states))
.fetchInto(Operation.class);
}
@Override
public void saveBatch(List<Operation> operations) {
var records = operations.stream()

View File

@ -1,8 +1,10 @@
package com.empayre.liminator.handler;
import com.empayre.liminator.domain.enums.OperationState;
import dev.vality.liminator.LimitRequest;
import org.apache.thrift.TException;
public interface FinalizeOperationHandler<T> {
public interface FinalizeOperationHandler {
void handle(T source) throws TException;
void handle(LimitRequest request, OperationState state) throws TException;
}

View File

@ -1,46 +0,0 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.handler.FinalizeOperationHandler;
import com.empayre.liminator.service.LimitDataGettingService;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.OperationNotFound;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class CommitLimitValueHandler implements FinalizeOperationHandler<LimitRequest> {
private final OperationDao operationDao;
private final LimitDataGettingService limitDataGettingService;
private static final String LOG_PREFIX = "COMMIT";
@Override
@Transactional
public void handle(LimitRequest request) throws TException {
if (request == null || CollectionUtils.isEmpty(request.getLimitNames())) {
log.warn("[{}] Received LimitRequest or LimitNames list is empty (request: {})", request, LOG_PREFIX);
return;
}
limitDataGettingService.get(request, LOG_PREFIX);
String operationId = request.getOperationId();
List<String> limitNames = request.getLimitNames();
int updatedRowsCount = operationDao.commit(limitNames, operationId);
if (updatedRowsCount != limitNames.size()) {
log.error("[{}] Count of updated rows ({}) is not equal to the count of source commit operations " +
"(operationId: {}, commit size: {})",
LOG_PREFIX, updatedRowsCount, operationId, limitNames.size());
throw new OperationNotFound();
}
}
}

View File

@ -0,0 +1,76 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.handler.FinalizeOperationHandler;
import com.empayre.liminator.service.LimitDataGettingService;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.OperationNotFound;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class FinalizeOperationHandlerImpl implements FinalizeOperationHandler {
private final OperationDao operationDao;
private final LimitDataGettingService limitDataGettingService;
@Override
@Transactional
public void handle(LimitRequest request, OperationState state) throws TException {
List<LimitData> limitData = limitDataGettingService.get(request, state.getLiteral());
checkExistedHoldOperations(request, limitData, state);
int updatedRowsCount = switch (state) {
case COMMIT -> operationDao.commit(request.getLimitNames(), request.getOperationId());
case ROLLBACK -> operationDao.rollback(request.getLimitNames(), request.getOperationId());
default -> throw new TException();
};
checkUpdatedOperstionsConsistency(request, state, updatedRowsCount);
}
private void checkExistedHoldOperations(LimitRequest request,
List<LimitData> limitData,
OperationState state) throws TException {
String logPrefix = state.getLiteral();
String operationId = request.getOperationId();
List<Long> limitIds = limitData.stream()
.map(LimitData::getId)
.toList();
List<Operation> existedHoldOperations = operationDao.get(operationId, limitIds, List.of(OperationState.HOLD));
if (CollectionUtils.isEmpty(existedHoldOperations)) {
log.error("[{}] Existed hold operations with ID {} not found: {} (request: {})",
logPrefix, operationId, existedHoldOperations, request);
throw new OperationNotFound();
}
if (limitIds.size() != existedHoldOperations.size()) {
log.error("[{}] Count of existed hold operations for limits is not equal to expected (existed size: {}, " +
"expected size: {}, request: {})", logPrefix, existedHoldOperations.size(),
limitIds.size(), request);
throw new OperationNotFound();
}
}
private void checkUpdatedOperstionsConsistency(LimitRequest request,
OperationState state,
int updatedRowsCount) throws TException {
List<String> limitNames = request.getLimitNames();
if (updatedRowsCount != limitNames.size()) {
log.error("[{}] Count of updated rows ({}) is not equal to the expected count of updated operations " +
"(rollback size: {})",
state.getLiteral(), updatedRowsCount, limitNames.size(), request);
throw new OperationNotFound();
}
}
}

View File

@ -2,14 +2,17 @@ package com.empayre.liminator.handler.impl;
import com.empayre.liminator.converter.OperationConverter;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.model.LimitValue;
import com.empayre.liminator.service.LimitDataGettingService;
import com.empayre.liminator.util.LimitDataUtils;
import dev.vality.liminator.DuplicateOperation;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.LimitResponse;
import dev.vality.liminator.OperationAlreadyInFinalState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
@ -38,16 +41,20 @@ public class HoldLimitValueHandler implements Handler<LimitRequest, List<LimitRe
@Override
public List<LimitResponse> handle(LimitRequest request) throws TException {
if (request == null || CollectionUtils.isEmpty(request.getLimitNames())) {
log.warn("LimitRequest or LimitNames is empty. Request: {}", request);
log.warn("[{}] LimitRequest or LimitNames is empty. Request: {}", LOG_PREFIX, request);
return new ArrayList<>();
}
List<LimitData> limitData = limitDataGettingService.get(request, LOG_PREFIX);
Map<String, Long> limitNamesMap = LimitDataUtils.createLimitNamesMap(limitData);
List<Operation> operations = convertToOperation(request, limitNamesMap);
operationDao.saveBatch(operations);
List<String> limitNames = request.getLimitNames();
return currentLimitValuesToLimitResponseConverter.convert(operationDao.getCurrentLimitValue(limitNames));
String operationId = request.getOperationId();
checkExistedHoldOperations(limitNamesMap, operationId);
checkExistedFinalizeOperations(limitNamesMap, operationId);
operationDao.saveBatch(convertToOperation(request, limitNamesMap));
List<LimitValue> currentLimitValues = operationDao.getCurrentLimitValue(request.getLimitNames());
return currentLimitValuesToLimitResponseConverter.convert(currentLimitValues);
}
private List<Operation> convertToOperation(LimitRequest request, Map<String, Long> limitNamesMap) {
@ -55,4 +62,26 @@ public class HoldLimitValueHandler implements Handler<LimitRequest, List<LimitRe
.map(limitName -> operationConverter.convert(request, limitNamesMap.get(limitName)))
.toList();
}
private void checkExistedHoldOperations(Map<String, Long> limitNamesMap, String operationId) throws TException {
List<Operation> existedHoldOperations =
operationDao.get(operationId, limitNamesMap.values(), List.of(OperationState.HOLD));
if (!CollectionUtils.isEmpty(existedHoldOperations)) {
log.error("[{}] DB already has hold operation {}: {}", LOG_PREFIX, operationId, existedHoldOperations);
throw new DuplicateOperation();
}
}
private void checkExistedFinalizeOperations(Map<String, Long> limitNamesMap, String operationId) throws TException {
List<Operation> existedFinalizeOperations = operationDao.get(
operationId,
limitNamesMap.values(),
List.of(OperationState.COMMIT, OperationState.ROLLBACK)
);
if (!CollectionUtils.isEmpty(existedFinalizeOperations)) {
log.error("[{}] DB already has commit/rollback operation {}: {}",
LOG_PREFIX, operationId, existedFinalizeOperations);
throw new OperationAlreadyInFinalState();
}
}
}

View File

@ -1,41 +0,0 @@
package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.handler.FinalizeOperationHandler;
import com.empayre.liminator.service.LimitDataGettingService;
import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.OperationNotFound;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class RollbackLimitValueHandler implements FinalizeOperationHandler<LimitRequest> {
private final OperationDao operationDao;
private final LimitDataGettingService limitDataGettingService;
private static final String LOG_PREFIX = "ROLLBACK";
@Override
@Transactional
public void handle(LimitRequest request) throws TException {
limitDataGettingService.get(request, LOG_PREFIX);
List<String> limitNames = request.getLimitNames();
String operationId = request.getOperationId();
int updatedRowsCount = operationDao.rollback(limitNames, operationId);
if (updatedRowsCount != limitNames.size()) {
log.error("[{}] Count of updated rows ({}) is not equal to the count of source rollback operations " +
"(operationId: {}, rollback size: {})",
LOG_PREFIX, updatedRowsCount, operationId, limitNames.size());
throw new OperationNotFound();
}
}
}

View File

@ -1,5 +1,6 @@
package com.empayre.liminator.service;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.handler.FinalizeOperationHandler;
import com.empayre.liminator.handler.Handler;
import dev.vality.liminator.*;
@ -17,8 +18,7 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
private final Handler<CreateLimitRequest, LimitResponse> createLimitHandler;
private final Handler<LimitRequest, List<LimitResponse>> holdLimitValueHandler;
private final FinalizeOperationHandler<LimitRequest> commitLimitValueHandler;
private final FinalizeOperationHandler<LimitRequest> rollbackLimitValueHandler;
private final FinalizeOperationHandler finalizeOperationHandler;
private final Handler<LimitRequest, List<LimitResponse>> getLimitsValuesHandler;
private final Handler<List<String>, List<LimitResponse>> getLastLimitsValuesHandler;
private final LimitOperationsLoggingService limitOperationsLoggingService;
@ -39,7 +39,7 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
@Override
public void commit(LimitRequest limitRequest) throws LimitNotFound, OperationNotFound, TException {
try {
commitLimitValueHandler.handle(limitRequest);
finalizeOperationHandler.handle(limitRequest, OperationState.COMMIT);
limitOperationsLoggingService.writeCommitOperations(limitRequest);
} catch (Exception ex) {
log.error("Commit execution exception. Request: {}", limitRequest, ex);
@ -49,7 +49,7 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
@Override
public void rollback(LimitRequest limitRequest) throws LimitNotFound, OperationNotFound, TException {
try {
rollbackLimitValueHandler.handle(limitRequest);
finalizeOperationHandler.handle(limitRequest, OperationState.ROLLBACK);
limitOperationsLoggingService.writeRollbackOperations(limitRequest);
} catch (Exception ex) {
log.error("Commit execution exception. Request: {}", limitRequest, ex);