mirror of
https://github.com/valitydev/liminator.git
synced 2024-11-06 01:15:21 +00:00
TD-955: Bump proto. Add ops logging. Refactoring (#4)
This commit is contained in:
parent
44c3657ec2
commit
7b6c8e4698
2
pom.xml
2
pom.xml
@ -165,7 +165,7 @@
|
||||
<dependency>
|
||||
<groupId>dev.vality</groupId>
|
||||
<artifactId>liminator-proto</artifactId>
|
||||
<version>1.2-63529e6</version>
|
||||
<version>1.3-3e2cc01</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>dev.vality.woody</groupId>
|
||||
|
@ -0,0 +1,12 @@
|
||||
package com.empayre.liminator.converter;
|
||||
|
||||
import com.empayre.liminator.domain.enums.OperationState;
|
||||
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface OperationStateHistoryConverter {
|
||||
|
||||
List<OperationStateHistory> convert(LimitRequest request, OperationState state);
|
||||
}
|
@ -16,7 +16,7 @@ public class OperationConverterImpl implements OperationConverter {
|
||||
Operation operation = new Operation();
|
||||
operation.setLimitId(limitId);
|
||||
operation.setOperationId(request.getOperationId());
|
||||
operation.setAmount(request.getValue());
|
||||
operation.setOperationValue(request.getValue());
|
||||
operation.setCreatedAt(LocalDateTime.now());
|
||||
operation.setState(OperationState.HOLD);
|
||||
return operation;
|
||||
|
@ -0,0 +1,27 @@
|
||||
package com.empayre.liminator.converter.impl;
|
||||
|
||||
import com.empayre.liminator.converter.OperationStateHistoryConverter;
|
||||
import com.empayre.liminator.domain.enums.OperationState;
|
||||
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
public class OperationStateHistoryConverterImpl implements OperationStateHistoryConverter {
|
||||
|
||||
@Override
|
||||
public List<OperationStateHistory> convert(LimitRequest request, OperationState state) {
|
||||
return request.getLimitNames().stream()
|
||||
.map(limitName -> {
|
||||
OperationStateHistory history = new OperationStateHistory();
|
||||
history.setOperationId(request.getOperationId());
|
||||
history.setLimitName(limitName);
|
||||
history.setOperationValue(request.getValue());
|
||||
history.setState(state);
|
||||
return history;
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@ public interface OperationDao extends CommonDao<Operation> {
|
||||
|
||||
List<LimitValue> getCurrentLimitValue(List<String> limitNames, String operationId);
|
||||
|
||||
int commit(List<String> operationIds);
|
||||
int commit(List<String> limitNames, String operationId);
|
||||
|
||||
int rollback(List<String> operationIds);
|
||||
int rollback(List<String> limitNames, String operationId);
|
||||
}
|
||||
|
@ -0,0 +1,10 @@
|
||||
package com.empayre.liminator.dao;
|
||||
|
||||
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface OperationStateHistoryDao extends CommonDao<OperationStateHistory> {
|
||||
|
||||
void saveBatch(List<OperationStateHistory> historyList);
|
||||
}
|
@ -4,7 +4,6 @@ import com.empayre.liminator.dao.AbstractDao;
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.domain.enums.OperationState;
|
||||
import com.empayre.liminator.domain.tables.pojos.Operation;
|
||||
import com.empayre.liminator.domain.tables.records.OperationRecord;
|
||||
import com.empayre.liminator.exception.DaoException;
|
||||
import com.empayre.liminator.model.LimitValue;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -13,6 +12,7 @@ import javax.sql.DataSource;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.empayre.liminator.domain.Tables.LIMIT_DATA;
|
||||
import static com.empayre.liminator.domain.Tables.OPERATION;
|
||||
import static org.jooq.impl.DSL.raw;
|
||||
import static org.jooq.impl.DSL.val;
|
||||
@ -46,27 +46,35 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
|
||||
|
||||
@Override
|
||||
public void saveBatch(List<Operation> operations) {
|
||||
List<OperationRecord> records = operations.stream()
|
||||
var records = operations.stream()
|
||||
.map(operation -> getDslContext().newRecord(OPERATION, operation))
|
||||
.toList();
|
||||
getDslContext().batchInsert(records).execute();
|
||||
getDslContext()
|
||||
.batchInsert(records)
|
||||
.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int commit(List<String> operationIds) {
|
||||
return updateStateForHoldOperation(operationIds, OperationState.COMMIT);
|
||||
public int commit(List<String> limitNames, String operationId) {
|
||||
return updateStateForHoldOperation(limitNames, operationId, OperationState.COMMIT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rollback(List<String> operationIds) {
|
||||
return updateStateForHoldOperation(operationIds, OperationState.ROLLBACK);
|
||||
public int rollback(List<String> limitNames, String operationId) {
|
||||
return updateStateForHoldOperation(limitNames, operationId, OperationState.ROLLBACK);
|
||||
}
|
||||
|
||||
private int updateStateForHoldOperation(List<String> operationIds, OperationState state) {
|
||||
private int updateStateForHoldOperation(List<String> limitNames, String operationId, OperationState state) {
|
||||
return getDslContext()
|
||||
.update(OPERATION)
|
||||
.set(OPERATION.STATE, state)
|
||||
.where(OPERATION.OPERATION_ID.in(operationIds))
|
||||
.where(OPERATION.OPERATION_ID.eq(operationId))
|
||||
.and(OPERATION.LIMIT_ID.in(
|
||||
getDslContext()
|
||||
.select(LIMIT_DATA.ID)
|
||||
.from(LIMIT_DATA)
|
||||
.where(LIMIT_DATA.NAME.in(limitNames))
|
||||
))
|
||||
.and(OPERATION.STATE.eq(OperationState.HOLD))
|
||||
.execute();
|
||||
}
|
||||
@ -75,14 +83,14 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
|
||||
public List<LimitValue> getCurrentLimitValue(List<String> limitNames) {
|
||||
String sql = """
|
||||
with hold_data as (
|
||||
select ld.id, ld.name, coalesce(sum(ops.amount), 0) as hold_value
|
||||
select ld.id, ld.name, coalesce(sum(ops.operation_value), 0) as hold_value
|
||||
from lim.limit_data as ld
|
||||
left join lim.operation as ops
|
||||
on ops.limit_id = ld.id and ops.state = 'HOLD'
|
||||
where ld.name in ({0})
|
||||
group by ld.id, ld.name
|
||||
), commit_data as (
|
||||
select ld.id, ld.name, coalesce(sum(ops.amount), 0) as commit_value
|
||||
select ld.id, ld.name, coalesce(sum(ops.operation_value), 0) as commit_value
|
||||
from lim.limit_data as ld
|
||||
left join lim.operation as ops
|
||||
on ops.limit_id = ld.id and ops.state = 'COMMIT'
|
||||
@ -107,7 +115,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
|
||||
from lim.operation
|
||||
where operation_id = {0}
|
||||
), hold_data as (
|
||||
select ld.id, ld.name, coalesce(sum(ops.amount), 0) as hold_amount
|
||||
select ld.id, ld.name, coalesce(sum(ops.operation_value), 0) as hold_value
|
||||
from lim.limit_data as ld
|
||||
left join lim.operation as ops
|
||||
on ops.limit_id = ld.id
|
||||
@ -116,7 +124,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
|
||||
where ld.name in ({1})
|
||||
group by ld.id, ld.name
|
||||
), commit_data as (
|
||||
select ld.id, ld.name, coalesce(sum(ops.amount), 0) as commit_amount
|
||||
select ld.id, ld.name, coalesce(sum(ops.operation_value), 0) as commit_value
|
||||
from lim.limit_data as ld
|
||||
left join lim.operation as ops
|
||||
on ops.limit_id = ld.id
|
||||
@ -126,7 +134,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
|
||||
group by ld.id, ld.name
|
||||
)
|
||||
|
||||
select cd.name as limit_name, cd.commit_amount, hd.hold_amount
|
||||
select cd.name as limit_name, cd.commit_value, hd.hold_value
|
||||
from commit_data as cd
|
||||
join hold_data as hd on cd.id = hd.id;
|
||||
""";
|
||||
|
@ -0,0 +1,41 @@
|
||||
package com.empayre.liminator.dao.impl;
|
||||
|
||||
import com.empayre.liminator.dao.AbstractDao;
|
||||
import com.empayre.liminator.dao.OperationStateHistoryDao;
|
||||
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
|
||||
import com.empayre.liminator.exception.DaoException;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static com.empayre.liminator.domain.Tables.OPERATION_STATE_HISTORY;
|
||||
|
||||
@Component
|
||||
public class OperationStateHistoryDaoImpl extends AbstractDao implements OperationStateHistoryDao {
|
||||
|
||||
public OperationStateHistoryDaoImpl(DataSource dataSource) {
|
||||
super(dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long save(OperationStateHistory history) throws DaoException {
|
||||
return getDslContext()
|
||||
.insertInto(OPERATION_STATE_HISTORY)
|
||||
.set(getDslContext().newRecord(OPERATION_STATE_HISTORY, history))
|
||||
.returning(OPERATION_STATE_HISTORY.ID)
|
||||
.fetchOne()
|
||||
.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveBatch(List<OperationStateHistory> historyList) {
|
||||
var records = historyList.stream()
|
||||
.map(history -> getDslContext().newRecord(OPERATION_STATE_HISTORY, history))
|
||||
.toList();
|
||||
getDslContext()
|
||||
.batchInsert(records)
|
||||
.execute();
|
||||
}
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package com.empayre.liminator.exception;
|
||||
|
||||
public class BusinessException extends RuntimeException {
|
||||
public BusinessException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package com.empayre.liminator.exception;
|
||||
|
||||
public class NotFoundException extends RuntimeException {
|
||||
public NotFoundException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package com.empayre.liminator.exception;
|
||||
|
||||
public class SerializationException extends RuntimeException {
|
||||
public SerializationException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
package com.empayre.liminator.handler;
|
||||
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.service.LimitsGettingService;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.thrift.TException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class CommitLimitAmountHandler implements Handler<List<LimitRequest>, Boolean> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final LimitsGettingService limitsGettingService;
|
||||
|
||||
private static final String LOG_PREFIX = "COMMIT";
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Boolean handle(List<LimitRequest> requestList) throws TException {
|
||||
if (CollectionUtils.isEmpty(requestList)) {
|
||||
log.warn("[{}] Received LimitRequest list is empty", LOG_PREFIX);
|
||||
return true;
|
||||
}
|
||||
limitsGettingService.get(requestList, LOG_PREFIX);
|
||||
|
||||
List<String> operationsIds = requestList.stream()
|
||||
.map(request -> request.getOperationId())
|
||||
.toList();
|
||||
int updatedRowsCount = operationDao.commit(operationsIds);
|
||||
if (updatedRowsCount != operationsIds.size()) {
|
||||
log.warn("[{}] Count of updated rows ({}) is not equal to the count of source commit operations ({})",
|
||||
LOG_PREFIX, updatedRowsCount, operationsIds.size());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package com.empayre.liminator.handler;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
public interface FinalizeOperationHandler<T> {
|
||||
|
||||
void handle(T source) throws TException;
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
package com.empayre.liminator.handler;
|
||||
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.service.LimitsGettingService;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.thrift.TException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RollbackLimitAmountHandler implements Handler<List<LimitRequest>, Boolean> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final LimitsGettingService limitsGettingService;
|
||||
|
||||
private static final String LOG_PREFIX = "ROLLBACK";
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Boolean handle(List<LimitRequest> requestList) throws TException {
|
||||
limitsGettingService.get(requestList, LOG_PREFIX);
|
||||
|
||||
List<String> operationsIds = requestList.stream()
|
||||
.map(request -> request.getOperationId())
|
||||
.toList();
|
||||
int updatedRowsCount = operationDao.rollback(operationsIds);
|
||||
if (updatedRowsCount != operationsIds.size()) {
|
||||
log.warn("[{}] Count of updated rows ({}) is not equal to the count of source rollback operations ({})",
|
||||
LOG_PREFIX, updatedRowsCount, operationsIds.size());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
package com.empayre.liminator.handler.impl;
|
||||
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.handler.FinalizeOperationHandler;
|
||||
import com.empayre.liminator.service.LimitDataGettingService;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import dev.vality.liminator.OperationNotFound;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.thrift.TException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class CommitLimitValueHandler implements FinalizeOperationHandler<LimitRequest> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final LimitDataGettingService limitDataGettingService;
|
||||
|
||||
private static final String LOG_PREFIX = "COMMIT";
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void handle(LimitRequest request) throws TException {
|
||||
if (request == null || CollectionUtils.isEmpty(request.getLimitNames())) {
|
||||
log.warn("[{}] Received LimitRequest or LimitNames list is empty (request: {})", request, LOG_PREFIX);
|
||||
return;
|
||||
}
|
||||
limitDataGettingService.get(request, LOG_PREFIX);
|
||||
|
||||
String operationId = request.getOperationId();
|
||||
List<String> limitNames = request.getLimitNames();
|
||||
int updatedRowsCount = operationDao.commit(limitNames, operationId);
|
||||
if (updatedRowsCount != limitNames.size()) {
|
||||
log.error("[{}] Count of updated rows ({}) is not equal to the count of source commit operations " +
|
||||
"(operationId: {}, commit size: {})",
|
||||
LOG_PREFIX, updatedRowsCount, operationId, limitNames.size());
|
||||
throw new OperationNotFound();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,9 +1,10 @@
|
||||
package com.empayre.liminator.handler;
|
||||
package com.empayre.liminator.handler.impl;
|
||||
|
||||
import com.empayre.liminator.dao.LimitContextDao;
|
||||
import com.empayre.liminator.dao.LimitDataDao;
|
||||
import com.empayre.liminator.domain.tables.pojos.LimitContext;
|
||||
import com.empayre.liminator.domain.tables.pojos.LimitData;
|
||||
import com.empayre.liminator.handler.Handler;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import dev.vality.liminator.CreateLimitRequest;
|
@ -1,6 +1,7 @@
|
||||
package com.empayre.liminator.handler;
|
||||
package com.empayre.liminator.handler.impl;
|
||||
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.handler.Handler;
|
||||
import com.empayre.liminator.model.LimitValue;
|
||||
import dev.vality.liminator.LimitResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -15,7 +16,7 @@ import java.util.List;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class GetLimitAmountHandler implements Handler<List<String>, List<LimitResponse>> {
|
||||
public class GetLastLimitsValuesHandler implements Handler<List<String>, List<LimitResponse>> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
|
@ -0,0 +1,32 @@
|
||||
package com.empayre.liminator.handler.impl;
|
||||
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.handler.Handler;
|
||||
import com.empayre.liminator.model.LimitValue;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import dev.vality.liminator.LimitResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.thrift.TException;
|
||||
import org.springframework.core.convert.converter.Converter;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class GetLimitsValuesHandler implements Handler<LimitRequest, List<LimitResponse>> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public List<LimitResponse> handle(LimitRequest request) throws TException {
|
||||
List<LimitValue> limitValues =
|
||||
operationDao.getCurrentLimitValue(request.getLimitNames(), request.getOperationId());
|
||||
return currentLimitValuesToLimitResponseConverter.convert(limitValues);
|
||||
}
|
||||
}
|
@ -1,11 +1,12 @@
|
||||
package com.empayre.liminator.handler;
|
||||
package com.empayre.liminator.handler.impl;
|
||||
|
||||
import com.empayre.liminator.converter.OperationConverter;
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.domain.tables.pojos.LimitData;
|
||||
import com.empayre.liminator.domain.tables.pojos.Operation;
|
||||
import com.empayre.liminator.handler.Handler;
|
||||
import com.empayre.liminator.model.LimitValue;
|
||||
import com.empayre.liminator.service.LimitsGettingService;
|
||||
import com.empayre.liminator.service.LimitDataGettingService;
|
||||
import com.empayre.liminator.util.LimitDataUtils;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import dev.vality.liminator.LimitResponse;
|
||||
@ -24,10 +25,10 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class HoldLimitAmountHandler implements Handler<List<LimitRequest>, List<LimitResponse>> {
|
||||
public class HoldLimitValueHandler implements Handler<LimitRequest, List<LimitResponse>> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final LimitsGettingService limitsGettingService;
|
||||
private final LimitDataGettingService limitDataGettingService;
|
||||
private final Converter<List<LimitValue>, List<LimitResponse>> currentLimitValuesToLimitResponseConverter;
|
||||
private final OperationConverter operationConverter;
|
||||
|
||||
@ -35,21 +36,23 @@ public class HoldLimitAmountHandler implements Handler<List<LimitRequest>, List<
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public List<LimitResponse> handle(List<LimitRequest> requestList) throws TException {
|
||||
if (CollectionUtils.isEmpty(requestList)) {
|
||||
public List<LimitResponse> handle(LimitRequest request) throws TException {
|
||||
if (request == null || CollectionUtils.isEmpty(request.getLimitNames())) {
|
||||
log.warn("LimitRequest or LimitNames is empty. Request: {}", request);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<LimitData> limitData = limitsGettingService.get(requestList, LOG_PREFIX);
|
||||
List<LimitData> limitData = limitDataGettingService.get(request, LOG_PREFIX);
|
||||
Map<String, Long> limitNamesMap = LimitDataUtils.createLimitNamesMap(limitData);
|
||||
List<Operation> operations = convertToOperation(requestList, limitNamesMap);
|
||||
List<Operation> operations = convertToOperation(request, limitNamesMap);
|
||||
operationDao.saveBatch(operations);
|
||||
List<String> limitNames = LimitDataUtils.getLimitNames(requestList);
|
||||
|
||||
List<String> limitNames = request.getLimitNames();
|
||||
return currentLimitValuesToLimitResponseConverter.convert(operationDao.getCurrentLimitValue(limitNames));
|
||||
}
|
||||
|
||||
private List<Operation> convertToOperation(List<LimitRequest> requestList, Map<String, Long> limitNamesMap) {
|
||||
return requestList.stream()
|
||||
.map(request -> operationConverter.convert(request, limitNamesMap.get(request.getLimitName())))
|
||||
private List<Operation> convertToOperation(LimitRequest request, Map<String, Long> limitNamesMap) {
|
||||
return request.getLimitNames().stream()
|
||||
.map(limitName -> operationConverter.convert(request, limitNamesMap.get(limitName)))
|
||||
.toList();
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package com.empayre.liminator.handler.impl;
|
||||
|
||||
import com.empayre.liminator.dao.OperationDao;
|
||||
import com.empayre.liminator.handler.FinalizeOperationHandler;
|
||||
import com.empayre.liminator.service.LimitDataGettingService;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import dev.vality.liminator.OperationNotFound;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.thrift.TException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RollbackLimitValueHandler implements FinalizeOperationHandler<LimitRequest> {
|
||||
|
||||
private final OperationDao operationDao;
|
||||
private final LimitDataGettingService limitDataGettingService;
|
||||
|
||||
private static final String LOG_PREFIX = "ROLLBACK";
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void handle(LimitRequest request) throws TException {
|
||||
limitDataGettingService.get(request, LOG_PREFIX);
|
||||
|
||||
List<String> limitNames = request.getLimitNames();
|
||||
String operationId = request.getOperationId();
|
||||
int updatedRowsCount = operationDao.rollback(limitNames, operationId);
|
||||
if (updatedRowsCount != limitNames.size()) {
|
||||
log.error("[{}] Count of updated rows ({}) is not equal to the count of source rollback operations " +
|
||||
"(operationId: {}, rollback size: {})",
|
||||
LOG_PREFIX, updatedRowsCount, operationId, limitNames.size());
|
||||
throw new OperationNotFound();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package com.empayre.liminator.service;
|
||||
|
||||
import com.empayre.liminator.handler.FinalizeOperationHandler;
|
||||
import com.empayre.liminator.handler.Handler;
|
||||
import dev.vality.liminator.*;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -15,10 +16,12 @@ import java.util.List;
|
||||
public class LiminatorService implements LiminatorServiceSrv.Iface {
|
||||
|
||||
private final Handler<CreateLimitRequest, LimitResponse> createLimitHandler;
|
||||
private final Handler<List<LimitRequest>, List<LimitResponse>> holdLimitAmountHandler;
|
||||
private final Handler<List<LimitRequest>, Boolean> commitLimitAmountHandler;
|
||||
private final Handler<List<LimitRequest>, Boolean> rollbackLimitAmountHandler;
|
||||
private final Handler<List<String>, List<LimitResponse>> getLimitAmountHandler;
|
||||
private final Handler<LimitRequest, List<LimitResponse>> holdLimitValueHandler;
|
||||
private final FinalizeOperationHandler<LimitRequest> commitLimitValueHandler;
|
||||
private final FinalizeOperationHandler<LimitRequest> rollbackLimitValueHandler;
|
||||
private final Handler<LimitRequest, List<LimitResponse>> getLimitsValuesHandler;
|
||||
private final Handler<List<String>, List<LimitResponse>> getLastLimitsValuesHandler;
|
||||
private final LimitOperationsLoggingService limitOperationsLoggingService;
|
||||
|
||||
@Override
|
||||
public LimitResponse create(CreateLimitRequest createLimitRequest) throws DuplicateLimitName, TException {
|
||||
@ -26,34 +29,40 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LimitResponse> hold(List<LimitRequest> list) throws LimitNotFound, TException {
|
||||
return holdLimitAmountHandler.handle(list);
|
||||
public List<LimitResponse> hold(LimitRequest limitRequest)
|
||||
throws LimitNotFound, DuplicateOperation, OperationAlreadyInFinalState, TException {
|
||||
List<LimitResponse> responses = holdLimitValueHandler.handle(limitRequest);
|
||||
limitOperationsLoggingService.writeHoldOperations(limitRequest);
|
||||
return responses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean commit(List<LimitRequest> list) throws LimitNotFound, TException {
|
||||
public void commit(LimitRequest limitRequest) throws LimitNotFound, OperationNotFound, TException {
|
||||
try {
|
||||
commitLimitAmountHandler.handle(list);
|
||||
return true;
|
||||
commitLimitValueHandler.handle(limitRequest);
|
||||
limitOperationsLoggingService.writeCommitOperations(limitRequest);
|
||||
} catch (Exception ex) {
|
||||
log.error("Commit execution exception. Request list: {}", list, ex);
|
||||
return false;
|
||||
log.error("Commit execution exception. Request: {}", limitRequest, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rollback(List<LimitRequest> list) throws LimitNotFound, TException {
|
||||
public void rollback(LimitRequest limitRequest) throws LimitNotFound, OperationNotFound, TException {
|
||||
try {
|
||||
rollbackLimitAmountHandler.handle(list);
|
||||
return true;
|
||||
rollbackLimitValueHandler.handle(limitRequest);
|
||||
limitOperationsLoggingService.writeRollbackOperations(limitRequest);
|
||||
} catch (Exception ex) {
|
||||
log.error("Commit execution exception. Request list: {}", list, ex);
|
||||
return false;
|
||||
log.error("Commit execution exception. Request: {}", limitRequest, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LimitResponse> get(List<String> limitNames) throws LimitNotFound, TException {
|
||||
return getLimitAmountHandler.handle(limitNames);
|
||||
public List<LimitResponse> get(LimitRequest limitRequest) throws LimitNotFound, TException {
|
||||
return getLimitsValuesHandler.handle(limitRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LimitResponse> getLastLimitsValues(List<String> limitNames) throws LimitNotFound, TException {
|
||||
return getLastLimitsValuesHandler.handle(limitNames);
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package com.empayre.liminator.service;
|
||||
|
||||
import com.empayre.liminator.dao.LimitDataDao;
|
||||
import com.empayre.liminator.domain.tables.pojos.LimitData;
|
||||
import com.empayre.liminator.util.LimitDataUtils;
|
||||
import dev.vality.liminator.LimitNotFound;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -16,12 +15,12 @@ import java.util.List;
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class LimitsGettingService {
|
||||
public class LimitDataGettingService {
|
||||
|
||||
private final LimitDataDao limitDataDao;
|
||||
|
||||
public List<LimitData> get(List<LimitRequest> requestList, String source) throws TException {
|
||||
List<String> limitNames = LimitDataUtils.getLimitNames(requestList);
|
||||
public List<LimitData> get(LimitRequest request, String source) throws TException {
|
||||
List<String> limitNames = request.getLimitNames();
|
||||
List<LimitData> limitData = limitDataDao.get(limitNames);
|
||||
if (CollectionUtils.isEmpty(limitData)) {
|
||||
log.error("[{}] Limits not found: {}", source, limitNames);
|
@ -0,0 +1,38 @@
|
||||
package com.empayre.liminator.service;
|
||||
|
||||
import com.empayre.liminator.converter.OperationStateHistoryConverter;
|
||||
import com.empayre.liminator.dao.OperationStateHistoryDao;
|
||||
import com.empayre.liminator.domain.enums.OperationState;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class LimitOperationsLoggingService {
|
||||
|
||||
private final OperationStateHistoryDao operationStateHistoryDao;
|
||||
private final OperationStateHistoryConverter operationStateHistoryConverter;
|
||||
|
||||
@Value("${service.logging.enabled}")
|
||||
private boolean loggingEnabled;
|
||||
|
||||
public void writeHoldOperations(LimitRequest request) {
|
||||
writeOperations(request, OperationState.HOLD);
|
||||
}
|
||||
|
||||
public void writeCommitOperations(LimitRequest request) {
|
||||
writeOperations(request, OperationState.COMMIT);
|
||||
}
|
||||
|
||||
public void writeRollbackOperations(LimitRequest request) {
|
||||
writeOperations(request, OperationState.ROLLBACK);
|
||||
}
|
||||
|
||||
private void writeOperations(LimitRequest request, OperationState state) {
|
||||
if (loggingEnabled) {
|
||||
operationStateHistoryDao.saveBatch(operationStateHistoryConverter.convert(request, state));
|
||||
}
|
||||
}
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
package com.empayre.liminator.util;
|
||||
|
||||
import com.empayre.liminator.domain.tables.pojos.LimitData;
|
||||
import dev.vality.liminator.LimitRequest;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@ -12,13 +11,6 @@ import java.util.Map;
|
||||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public class LimitDataUtils {
|
||||
|
||||
public static List<String> getLimitNames(List<LimitRequest> requestList) {
|
||||
return requestList.stream()
|
||||
.map(LimitRequest::getLimitName)
|
||||
.distinct()
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static Map<String, Long> createLimitNamesMap(List<LimitData> limitData) {
|
||||
Map<String, Long> map = new HashMap<>();
|
||||
for (LimitData data : limitData) {
|
||||
|
@ -33,6 +33,8 @@ spring:
|
||||
service:
|
||||
default:
|
||||
limit: 20
|
||||
logging:
|
||||
enabled: true
|
||||
|
||||
management:
|
||||
server:
|
||||
|
@ -35,10 +35,24 @@ CREATE TABLE lim.operation
|
||||
limit_id bigint NOT NULL,
|
||||
operation_id character varying NOT NULL,
|
||||
state lim.operation_state NOT NULL,
|
||||
amount bigint NOT NULL,
|
||||
operation_value bigint NOT NULL,
|
||||
created_at timestamp without time zone DEFAULT timezone('utc'::text, now()) NOT NULL,
|
||||
CONSTRAINT operation_pkey PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX operation_unq_idx ON lim.operation USING btree (limit_id, operation_id);
|
||||
CREATE INDEX operation_idx ON lim.operation USING btree (limit_id, state, created_at, operation_id);
|
||||
|
||||
|
||||
CREATE TABLE lim.operation_state_history
|
||||
(
|
||||
id bigserial NOT NULL,
|
||||
operation_id character varying NOT NULL,
|
||||
limit_name character varying NOT NULL,
|
||||
state lim.operation_state NOT NULL,
|
||||
operation_value bigint NOT NULL,
|
||||
created_at timestamp without time zone DEFAULT timezone('utc'::text, now()) NOT NULL,
|
||||
CONSTRAINT operation_state_history_pkey PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE INDEX operation_state_history_unq_idx ON lim.operation USING btree (operation_id);
|
||||
|
@ -65,9 +65,10 @@ public class DaoTests {
|
||||
limitNamesList.add(limitName);
|
||||
}
|
||||
List<Operation> operations = new ArrayList<>();
|
||||
String operationNameTemplate = "Operation-odc-1-%s";
|
||||
for (Long limitId : limitIdsList) {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
operations.add(createOperation(limitId, "Operation-odc-1-%s-%s".formatted(limitId, i)));
|
||||
operations.add(createOperation(limitId, operationNameTemplate.formatted(i)));
|
||||
}
|
||||
}
|
||||
operationDao.saveBatch(operations);
|
||||
@ -77,30 +78,29 @@ public class DaoTests {
|
||||
currentLimitValue.forEach(value -> assertEquals(0, value.getCommitValue()));
|
||||
currentLimitValue.forEach(value -> assertNotEquals(0, value.getHoldValue()));
|
||||
|
||||
List<String> commitOperations = operations.subList(0, 20).stream()
|
||||
.map(Operation::getOperationId)
|
||||
.toList();
|
||||
operationDao.commit(commitOperations);
|
||||
List<String> rollbackOperations = operations.subList(20, 35).stream()
|
||||
.map(Operation::getOperationId)
|
||||
.toList();
|
||||
operationDao.rollback(rollbackOperations);
|
||||
List<String> commitLimitNames = limitNamesList.subList(0, 3);
|
||||
String finalizeOperationName = operationNameTemplate.formatted(1);
|
||||
operationDao.commit(commitLimitNames, finalizeOperationName);
|
||||
|
||||
List<String> rollbackLimitNames = limitNamesList.subList(4, 9);
|
||||
operationDao.rollback(rollbackLimitNames, finalizeOperationName);
|
||||
|
||||
|
||||
List<LimitValue> limitValuesAfterChanges = operationDao.getCurrentLimitValue(limitNamesList);
|
||||
List<LimitValue> limitValuesWithCommitData = limitValuesAfterChanges.stream()
|
||||
.filter(value -> value.getCommitValue() > 0 && value.getHoldValue() == 0)
|
||||
.filter(value -> value.getCommitValue() == 100 && value.getHoldValue() == 400)
|
||||
.toList();
|
||||
assertEquals(4, limitValuesWithCommitData.size());
|
||||
assertEquals(3, limitValuesWithCommitData.size());
|
||||
|
||||
List<LimitValue> limitValuesWithHoldData = limitValuesAfterChanges.stream()
|
||||
.filter(value -> value.getCommitValue() == 0 && value.getHoldValue() > 0)
|
||||
List<LimitValue> limitValuesAfterRollback = limitValuesAfterChanges.stream()
|
||||
.filter(value -> value.getHoldValue() == 400 && value.getCommitValue() == 0)
|
||||
.toList();
|
||||
assertEquals(3, limitValuesWithHoldData.size());
|
||||
assertEquals(5, limitValuesAfterRollback.size());
|
||||
|
||||
List<LimitValue> limitValuesWithoutData = limitValuesAfterChanges.stream()
|
||||
.filter(value -> value.getCommitValue() == 0 && value.getHoldValue() == 0)
|
||||
List<LimitValue> limitValuesWithoutChanges = limitValuesAfterChanges.stream()
|
||||
.filter(value -> value.getCommitValue() == 0 && value.getHoldValue() == 500)
|
||||
.toList();
|
||||
assertEquals(3, limitValuesWithoutData.size());
|
||||
assertEquals(2, limitValuesWithoutChanges.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -136,7 +136,7 @@ public class DaoTests {
|
||||
operation.setLimitId(limitId);
|
||||
operation.setOperationId(operationId);
|
||||
operation.setState(OperationState.HOLD);
|
||||
operation.setAmount(100L);
|
||||
operation.setOperationValue(100L);
|
||||
operation.setCreatedAt(createdAt);
|
||||
return operation;
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@PostgresqlSpringBootITest
|
||||
public class LiminatorServiceTest {
|
||||
@ -40,10 +39,10 @@ public class LiminatorServiceTest {
|
||||
|
||||
String operationId = "OpHold";
|
||||
LimitRequest holdRequest = new LimitRequest()
|
||||
.setLimitName(limitName)
|
||||
.setOperationId(operationId)
|
||||
.setLimitNames(List.of(limitName))
|
||||
.setValue(500L);
|
||||
List<LimitResponse> holdResponse = liminatorService.hold(List.of(holdRequest));
|
||||
List<LimitResponse> holdResponse = liminatorService.hold(holdRequest);
|
||||
assertEquals(1, holdResponse.size());
|
||||
LimitResponse response = holdResponse.get(0);
|
||||
assertEquals(500, response.getHoldValue());
|
||||
@ -60,13 +59,13 @@ public class LiminatorServiceTest {
|
||||
|
||||
String operationId = "OpComit";
|
||||
LimitRequest holdRequest = new LimitRequest()
|
||||
.setLimitName(limitName)
|
||||
.setOperationId(operationId)
|
||||
.setLimitNames(List.of(limitName))
|
||||
.setValue(500L);
|
||||
liminatorService.hold(List.of(holdRequest));
|
||||
assertTrue(liminatorService.commit(List.of(holdRequest)));
|
||||
liminatorService.hold(holdRequest);
|
||||
liminatorService.commit(holdRequest);
|
||||
|
||||
List<LimitResponse> limitResponses = liminatorService.get(List.of(limitName));
|
||||
List<LimitResponse> limitResponses = liminatorService.getLastLimitsValues(List.of(limitName));
|
||||
assertEquals(1, limitResponses.size());
|
||||
assertEquals(0, limitResponses.get(0).getHoldValue());
|
||||
assertEquals(500, limitResponses.get(0).getCommitValue());
|
||||
@ -82,13 +81,13 @@ public class LiminatorServiceTest {
|
||||
|
||||
String operationId = "Op-112";
|
||||
LimitRequest holdRequest = new LimitRequest()
|
||||
.setLimitName(limitName)
|
||||
.setOperationId(operationId)
|
||||
.setLimitNames(List.of(limitName))
|
||||
.setValue(500L);
|
||||
liminatorService.hold(List.of(holdRequest));
|
||||
assertTrue(liminatorService.rollback(List.of(holdRequest)));
|
||||
liminatorService.hold(holdRequest);
|
||||
liminatorService.rollback(holdRequest);
|
||||
|
||||
List<LimitResponse> limitResponses = liminatorService.get(List.of(limitName));
|
||||
List<LimitResponse> limitResponses = liminatorService.getLastLimitsValues(List.of(limitName));
|
||||
assertEquals(1, limitResponses.size());
|
||||
assertEquals(0, limitResponses.get(0).getHoldValue());
|
||||
assertEquals(0, limitResponses.get(0).getCommitValue());
|
||||
|
Loading…
Reference in New Issue
Block a user