TD-955: Bump proto (add LimitChange object to proto) (#17)
Some checks failed
Deploy Docker Image / build-and-deploy (push) Has been cancelled

This commit is contained in:
Baikov Dmitrii 2024-09-18 14:53:07 +03:00 committed by GitHub
parent c7d852d9dc
commit 20314fd894
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 57 additions and 52 deletions

View File

@ -15,7 +15,7 @@
- _LimitNotFound_ - данная ошибка будет даже если какого-то одного лимита из списка - _LimitNotFound_ - данная ошибка будет даже если какого-то одного лимита из списка
переданных нет в БД переданных нет в БД
- _DuplicateOperation_ - данная ошибка будет передана если для связки LimitName+OperationId в - _DuplicateOperation_ - данная ошибка будет передана если для связки LimitName+OperationId в
БД уже имеется запись в значении HOLD БД уже имеется запись в значении HOLD и флаг skipExistedHoldOps в настройках сервиса установлен в false
- _OperationAlreadyInFinalState_ - данная ошибка будет передана если для связки LimitName+OperationId в - _OperationAlreadyInFinalState_ - данная ошибка будет передана если для связки LimitName+OperationId в
БД уже имеется запись в значении COMMIT/ROLLBACK БД уже имеется запись в значении COMMIT/ROLLBACK
- _LimitsValuesReadingException_ - данная ошибка будет передана если при подсчете лимитов произошла - _LimitsValuesReadingException_ - данная ошибка будет передана если при подсчете лимитов произошла

View File

@ -173,7 +173,7 @@
<dependency> <dependency>
<groupId>dev.vality</groupId> <groupId>dev.vality</groupId>
<artifactId>liminator-proto</artifactId> <artifactId>liminator-proto</artifactId>
<version>1.4-765e75c</version> <version>1.5-fe957d8</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>dev.vality.woody</groupId> <groupId>dev.vality.woody</groupId>

View File

@ -5,5 +5,5 @@ import dev.vality.liminator.LimitRequest;
public interface OperationConverter { public interface OperationConverter {
Operation convert(LimitRequest request, Long limitId); Operation convert(LimitRequest request, Long limitId, Long limitValue);
} }

View File

@ -12,11 +12,11 @@ import java.time.LocalDateTime;
public class OperationConverterImpl implements OperationConverter { public class OperationConverterImpl implements OperationConverter {
@Override @Override
public Operation convert(LimitRequest request, Long limitId) { public Operation convert(LimitRequest request, Long limitId, Long limitValue) {
Operation operation = new Operation(); Operation operation = new Operation();
operation.setLimitId(limitId); operation.setLimitId(limitId);
operation.setOperationId(request.getOperationId()); operation.setOperationId(request.getOperationId());
operation.setOperationValue(request.getValue()); operation.setOperationValue(limitValue);
operation.setCreatedAt(LocalDateTime.now()); operation.setCreatedAt(LocalDateTime.now());
operation.setState(OperationState.HOLD); operation.setState(OperationState.HOLD);
return operation; return operation;

View File

@ -13,12 +13,12 @@ public class OperationStateHistoryConverterImpl implements OperationStateHistory
@Override @Override
public List<OperationStateHistory> convert(LimitRequest request, OperationState state) { public List<OperationStateHistory> convert(LimitRequest request, OperationState state) {
return request.getLimitNames().stream() return request.getLimitChanges().stream()
.map(limitName -> { .map(change -> {
OperationStateHistory history = new OperationStateHistory(); OperationStateHistory history = new OperationStateHistory();
history.setOperationId(request.getOperationId()); history.setOperationId(request.getOperationId());
history.setLimitName(limitName); history.setLimitName(change.getLimitName());
history.setOperationValue(request.getValue()); history.setOperationValue(change.getValue());
history.setState(state); history.setState(state);
return history; return history;
}) })

View File

@ -21,7 +21,7 @@ public interface OperationDao extends CommonDao<Operation> {
List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId); List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId);
int commit(List<String> limitNames, String operationId); int commit(String operationId, List<Long> limitIds);
int rollback(List<String> limitNames, String operationId); int rollback(String operationId, List<Long> limitIds);
} }

View File

@ -13,7 +13,6 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.empayre.liminator.domain.Tables.LIMIT_DATA;
import static com.empayre.liminator.domain.Tables.OPERATION; import static com.empayre.liminator.domain.Tables.OPERATION;
import static org.jooq.impl.DSL.raw; import static org.jooq.impl.DSL.raw;
import static org.jooq.impl.DSL.val; import static org.jooq.impl.DSL.val;
@ -80,26 +79,21 @@ public class OperationDaoImpl implements OperationDao {
} }
@Override @Override
public int commit(List<String> limitNames, String operationId) { public int commit(String operationId, List<Long> limitIds) {
return updateStateForHoldOperation(limitNames, operationId, OperationState.COMMIT); return updateStateForHoldOperation(operationId, OperationState.COMMIT, limitIds);
} }
@Override @Override
public int rollback(List<String> limitNames, String operationId) { public int rollback(String operationId, List<Long> limitIds) {
return updateStateForHoldOperation(limitNames, operationId, OperationState.ROLLBACK); return updateStateForHoldOperation(operationId, OperationState.ROLLBACK, limitIds);
} }
private int updateStateForHoldOperation(List<String> limitNames, String operationId, OperationState state) { private int updateStateForHoldOperation(String operationId, OperationState state, List<Long> limitIds) {
return dslContext return dslContext
.update(OPERATION) .update(OPERATION)
.set(OPERATION.STATE, state) .set(OPERATION.STATE, state)
.where(OPERATION.OPERATION_ID.eq(operationId)) .where(OPERATION.OPERATION_ID.eq(operationId))
.and(OPERATION.LIMIT_ID.in( .and(OPERATION.LIMIT_ID.in(limitIds))
dslContext
.select(LIMIT_DATA.ID)
.from(LIMIT_DATA)
.where(LIMIT_DATA.NAME.in(limitNames))
))
.and(OPERATION.STATE.eq(OperationState.HOLD)) .and(OPERATION.STATE.eq(OperationState.HOLD))
.execute(); .execute();
} }

View File

@ -32,14 +32,16 @@ public class FinalizeOperationHandlerImpl implements FinalizeOperationHandler {
public void handle(LimitRequest request, OperationState state) throws TException { public void handle(LimitRequest request, OperationState state) throws TException {
List<LimitData> limitData = limitDataGettingService.get(request, state.getLiteral()); List<LimitData> limitData = limitDataGettingService.get(request, state.getLiteral());
checkExistedHoldOperations(request, limitData, state); checkExistedHoldOperations(request, limitData, state);
List<Long> limitIds = limitData.stream()
.map(LimitData::getId)
.toList();
int updatedRowsCount = switch (state) { int updatedRowsCount = switch (state) {
case COMMIT -> operationDao.commit(request.getLimitNames(), request.getOperationId()); case COMMIT -> operationDao.commit(request.getOperationId(), limitIds);
case ROLLBACK -> operationDao.rollback(request.getLimitNames(), request.getOperationId()); case ROLLBACK -> operationDao.rollback(request.getOperationId(), limitIds);
default -> throw new TException(); default -> throw new TException();
}; };
checkUpdatedOperstionsConsistency(request, state, updatedRowsCount); checkUpdatedOperationsConsistency(request, state, updatedRowsCount);
limitOperationsLoggingService.writeOperations(request, state); limitOperationsLoggingService.writeOperations(request, state);
} }
@ -65,14 +67,14 @@ public class FinalizeOperationHandlerImpl implements FinalizeOperationHandler {
} }
} }
private void checkUpdatedOperstionsConsistency(LimitRequest request, private void checkUpdatedOperationsConsistency(LimitRequest request,
OperationState state, OperationState state,
int updatedRowsCount) throws TException { int updatedRowsCount) throws TException {
List<String> limitNames = request.getLimitNames(); int changesSize = request.getLimitChanges().size();
if (updatedRowsCount != limitNames.size()) { if (updatedRowsCount != changesSize) {
log.error("[{}] Count of updated rows ({}) is not equal to the expected count of updated operations " + log.error("[{}] Count of updated rows ({}) is not equal to the expected count of updated operations " +
"(rollback size: {})", "(rollback size: {}, request: {})",
state.getLiteral(), updatedRowsCount, limitNames.size(), request); state.getLiteral(), updatedRowsCount, changesSize, request);
throw new OperationNotFound(); throw new OperationNotFound();
} }
} }

View File

@ -3,6 +3,7 @@ package com.empayre.liminator.handler.impl;
import com.empayre.liminator.dao.OperationDao; import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.handler.Handler; import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.model.LimitValue; import com.empayre.liminator.model.LimitValue;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest; import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.LimitResponse; import dev.vality.liminator.LimitResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -25,8 +26,10 @@ public class GetLimitsValuesHandler implements Handler<LimitRequest, List<LimitR
@Transactional @Transactional
@Override @Override
public List<LimitResponse> handle(LimitRequest request) throws TException { public List<LimitResponse> handle(LimitRequest request) throws TException {
List<LimitValue> limitValues = List<String> limitNames = request.getLimitChanges().stream()
operationDao.getCurrentLimitValue(request.getLimitNames(), request.getOperationId()); .map(LimitChange::getLimitName)
.toList();
List<LimitValue> limitValues = operationDao.getCurrentLimitValue(limitNames, request.getOperationId());
return currentLimitValuesToLimitResponseConverter.convert(limitValues); return currentLimitValuesToLimitResponseConverter.convert(limitValues);
} }
} }

View File

@ -55,8 +55,13 @@ public class HoldOperationHandlerImpl implements HoldOperationHandler {
} }
private List<Operation> convertToOperation(LimitRequest request, Map<String, Long> limitNamesMap) { private List<Operation> convertToOperation(LimitRequest request, Map<String, Long> limitNamesMap) {
return request.getLimitNames().stream() return request.getLimitChanges().stream()
.map(limitName -> operationConverter.convert(request, limitNamesMap.get(limitName))) .map(change -> operationConverter.convert(
request,
limitNamesMap.get(change.getLimitName()),
change.getValue()
)
)
.toList(); .toList();
} }
@ -69,7 +74,8 @@ public class HoldOperationHandlerImpl implements HoldOperationHandler {
} }
} }
private void checkExistedFinalizeOperations(Map<String, Long> limitNamesMap, String operationId) throws TException { private void checkExistedFinalizeOperations(Map<String, Long> limitNamesMap,
String operationId) throws TException {
List<Operation> existedFinalizeOperations = operationDao.get( List<Operation> existedFinalizeOperations = operationDao.get(
operationId, operationId,
limitNamesMap.values(), limitNamesMap.values(),

View File

@ -34,7 +34,7 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
@Override @Override
public List<LimitResponse> hold(LimitRequest request) public List<LimitResponse> hold(LimitRequest request)
throws LimitNotFound, DuplicateOperation, OperationAlreadyInFinalState, TException { throws LimitNotFound, DuplicateOperation, OperationAlreadyInFinalState, TException {
if (request == null || CollectionUtils.isEmpty(request.getLimitNames())) { if (request == null || CollectionUtils.isEmpty(request.getLimitChanges())) {
log.warn("[HOLD] LimitRequest or LimitNames is empty. Request: {}", request); log.warn("[HOLD] LimitRequest or LimitNames is empty. Request: {}", request);
return new ArrayList<>(); return new ArrayList<>();
} }

View File

@ -2,6 +2,7 @@ package com.empayre.liminator.service;
import com.empayre.liminator.dao.LimitDataDao; import com.empayre.liminator.dao.LimitDataDao;
import com.empayre.liminator.domain.tables.pojos.LimitData; import com.empayre.liminator.domain.tables.pojos.LimitData;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitNotFound; import dev.vality.liminator.LimitNotFound;
import dev.vality.liminator.LimitRequest; import dev.vality.liminator.LimitRequest;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -20,7 +21,9 @@ public class LimitDataGettingService {
private final LimitDataDao limitDataDao; private final LimitDataDao limitDataDao;
public List<LimitData> get(LimitRequest request, String source) throws TException { public List<LimitData> get(LimitRequest request, String source) throws TException {
List<String> limitNames = request.getLimitNames(); List<String> limitNames = request.getLimitChanges().stream()
.map(LimitChange::getLimitName)
.toList();
List<LimitData> limitData = limitDataDao.get(limitNames); List<LimitData> limitData = limitDataDao.get(limitNames);
if (CollectionUtils.isEmpty(limitData)) { if (CollectionUtils.isEmpty(limitData)) {
log.error("[{}] Limits not found: {}", source, limitNames); log.error("[{}] Limits not found: {}", source, limitNames);

View File

@ -78,13 +78,12 @@ public class DaoTests {
currentLimitValue.forEach(value -> assertEquals(0, value.getCommitValue())); currentLimitValue.forEach(value -> assertEquals(0, value.getCommitValue()));
currentLimitValue.forEach(value -> assertNotEquals(0, value.getHoldValue())); currentLimitValue.forEach(value -> assertNotEquals(0, value.getHoldValue()));
List<String> commitLimitNames = limitNamesList.subList(0, 3); List<Long> commitLimitIds = limitIdsList.subList(0, 3);
String finalizeOperationName = operationNameTemplate.formatted(1); String finalizeOperationName = operationNameTemplate.formatted(1);
operationDao.commit(commitLimitNames, finalizeOperationName); operationDao.commit(finalizeOperationName, commitLimitIds);
List<String> rollbackLimitNames = limitNamesList.subList(4, 9);
operationDao.rollback(rollbackLimitNames, finalizeOperationName);
List<Long> rollbackLimitIds = limitIdsList.subList(4, 9);
operationDao.rollback(finalizeOperationName, rollbackLimitIds);
List<LimitValue> limitValuesAfterChanges = operationDao.getCurrentLimitValue(limitNamesList); List<LimitValue> limitValuesAfterChanges = operationDao.getCurrentLimitValue(limitNamesList);
List<LimitValue> limitValuesWithCommitData = limitValuesAfterChanges.stream() List<LimitValue> limitValuesWithCommitData = limitValuesAfterChanges.stream()

View File

@ -2,6 +2,7 @@ package com.empayre.liminator.service;
import com.empayre.liminator.config.PostgresqlSpringBootITest; import com.empayre.liminator.config.PostgresqlSpringBootITest;
import dev.vality.liminator.CreateLimitRequest; import dev.vality.liminator.CreateLimitRequest;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest; import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.LimitResponse; import dev.vality.liminator.LimitResponse;
import org.apache.thrift.TException; import org.apache.thrift.TException;
@ -40,8 +41,7 @@ public class LiminatorServiceTest {
String operationId = "OpHold"; String operationId = "OpHold";
LimitRequest holdRequest = new LimitRequest() LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId) .setOperationId(operationId)
.setLimitNames(List.of(limitName)) .setLimitChanges(List.of(new LimitChange(limitName, 500L)));
.setValue(500L);
List<LimitResponse> holdResponse = liminatorService.hold(holdRequest); List<LimitResponse> holdResponse = liminatorService.hold(holdRequest);
assertEquals(1, holdResponse.size()); assertEquals(1, holdResponse.size());
LimitResponse response = holdResponse.get(0); LimitResponse response = holdResponse.get(0);
@ -60,8 +60,7 @@ public class LiminatorServiceTest {
String operationId = "OpComit"; String operationId = "OpComit";
LimitRequest holdRequest = new LimitRequest() LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId) .setOperationId(operationId)
.setLimitNames(List.of(limitName)) .setLimitChanges(List.of(new LimitChange(limitName, 500L)));
.setValue(500L);
liminatorService.hold(holdRequest); liminatorService.hold(holdRequest);
liminatorService.commit(holdRequest); liminatorService.commit(holdRequest);
@ -82,8 +81,7 @@ public class LiminatorServiceTest {
String operationId = "Op-112"; String operationId = "Op-112";
LimitRequest holdRequest = new LimitRequest() LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId) .setOperationId(operationId)
.setLimitNames(List.of(limitName)) .setLimitChanges(List.of(new LimitChange(limitName, 500L)));
.setValue(500L);
liminatorService.hold(holdRequest); liminatorService.hold(holdRequest);
liminatorService.rollback(holdRequest); liminatorService.rollback(holdRequest);

View File

@ -3,6 +3,7 @@ package com.empayre.liminator.transaction;
import com.empayre.liminator.dao.OperationStateHistoryDao; import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.service.LiminatorService; import com.empayre.liminator.service.LiminatorService;
import dev.vality.liminator.CreateLimitRequest; import dev.vality.liminator.CreateLimitRequest;
import dev.vality.liminator.LimitChange;
import dev.vality.liminator.LimitRequest; import dev.vality.liminator.LimitRequest;
import dev.vality.liminator.LimitResponse; import dev.vality.liminator.LimitResponse;
import org.apache.thrift.TException; import org.apache.thrift.TException;
@ -42,8 +43,7 @@ public class TransactionVisibilityTest extends AbstractIntegrationTestWithEmbedd
String operationId = "OpComit123"; String operationId = "OpComit123";
LimitRequest holdRequest = new LimitRequest() LimitRequest holdRequest = new LimitRequest()
.setOperationId(operationId) .setOperationId(operationId)
.setLimitNames(List.of(limitName)) .setLimitChanges(List.of(new LimitChange(limitName, 500L)));
.setValue(500L);
assertThrows(RuntimeException.class, () -> liminatorService.hold(holdRequest)); assertThrows(RuntimeException.class, () -> liminatorService.hold(holdRequest));