TD-955: ReadMe fixes. HOLD flow fixes. JOOQ refactoring (#14)

This commit is contained in:
Baikov Dmitrii 2024-09-13 16:20:34 +03:00 committed by GitHub
parent bbc187ca4f
commit c7d852d9dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 82 additions and 156 deletions

View File

@ -18,6 +18,10 @@
БД уже имеется запись в значении HOLD
- _OperationAlreadyInFinalState_ - данная ошибка будет передана если для связки LimitName+OperationId в
БД уже имеется запись в значении COMMIT/ROLLBACK
- _LimitsValuesReadingException_ - данная ошибка будет передана если при подсчете лимитов произошла
какая-то ошибка (это значит, что холдирование уже было выполнено и нужно перезапросить значение
лимитов для данной операции; c установленным флагом skipExistedHoldOps можно запустить повторно
операцию холдирования, а найденные в БД строки по ключу limitId+operationId будут проигнорированы)
3. **Финализация операции (COMMIT/ROLLBACK)**. Применение или отмена внесенного значения. Могут быть
следующие ошибки:
- _LimitNotFound_ - данная ошибка будет даже если какого-то одного лимита из списка
@ -27,9 +31,5 @@
4. **Получение значений по лимитам.** Может быть получено как для определенной операции, так и
последнее актуальное значение. Могут быть следующие ошибки:
- _LimitNotFound_ - если переданный лимит не найден
***
Важно:
- операции производятся транзакционно, т.е. при ошибке во время получения списка лимитов для
операции холдирования значение в БД будет подвергнуто роллбэку.
- _LimitsValuesReadingException_ - данная ошибка будет передана если при подсчете лимитов произошла
какая-то ошибка

View File

@ -72,7 +72,10 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<!-- <version>3.3.3</version>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jooq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>

View File

@ -1,12 +1,8 @@
package com.empayre.liminator.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jooq.impl.DataSourceConnectionProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
import javax.sql.DataSource;
@Configuration
public class ApplicationConfig {
@ -15,9 +11,4 @@ public class ApplicationConfig {
public ObjectMapper mapper() {
return new ObjectMapper();
}
@Bean
public DataSourceConnectionProvider dataSourceConnectionProviderTransactionAware(DataSource dataSource) {
return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
}
}

View File

@ -1,22 +0,0 @@
package com.empayre.liminator.dao;
import lombok.Getter;
import org.jooq.Configuration;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.jooq.impl.DataSourceConnectionProvider;
import org.jooq.impl.DefaultConfiguration;
@Getter
public abstract class AbstractDao {
private final DSLContext dslContext;
public AbstractDao(DataSourceConnectionProvider dataSource) {
Configuration configuration = new DefaultConfiguration();
configuration.set(dataSource);
configuration.set(SQLDialect.POSTGRES);
this.dslContext = DSL.using(configuration);
}
}

View File

@ -1,8 +1,6 @@
package com.empayre.liminator.dao;
import com.empayre.liminator.exception.DaoException;
public interface CommonDao<T> {
Long save(T domainObject) throws DaoException;
Long save(T domainObject);
}

View File

@ -1,26 +1,24 @@
package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.AbstractDao;
import com.empayre.liminator.dao.LimitContextDao;
import com.empayre.liminator.domain.tables.pojos.LimitContext;
import com.empayre.liminator.exception.DaoException;
import org.jooq.impl.DataSourceConnectionProvider;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.springframework.stereotype.Component;
import static com.empayre.liminator.domain.Tables.LIMIT_CONTEXT;
@Component
public class LimitContextDaoImpl extends AbstractDao implements LimitContextDao {
@RequiredArgsConstructor
public class LimitContextDaoImpl implements LimitContextDao {
public LimitContextDaoImpl(DataSourceConnectionProvider dataSource) {
super(dataSource);
}
private final DSLContext dslContext;
@Override
public Long save(LimitContext limitContext) throws DaoException {
return getDslContext()
public Long save(LimitContext limitContext) {
return dslContext
.insertInto(LIMIT_CONTEXT)
.set(getDslContext().newRecord(LIMIT_CONTEXT, limitContext))
.set(dslContext.newRecord(LIMIT_CONTEXT, limitContext))
.returning(LIMIT_CONTEXT.ID)
.fetchOne()
.getId();
@ -28,7 +26,7 @@ public class LimitContextDaoImpl extends AbstractDao implements LimitContextDao
@Override
public LimitContext getLimitContext(Long limitId) {
return getDslContext()
return dslContext
.selectFrom(LIMIT_CONTEXT)
.where(LIMIT_CONTEXT.LIMIT_ID.eq(limitId))
.fetchOneInto(LimitContext.class);

View File

@ -1,10 +1,9 @@
package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.AbstractDao;
import com.empayre.liminator.dao.LimitDataDao;
import com.empayre.liminator.domain.tables.pojos.LimitData;
import com.empayre.liminator.exception.DaoException;
import org.jooq.impl.DataSourceConnectionProvider;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@ -14,17 +13,16 @@ import java.util.List;
import static com.empayre.liminator.domain.Tables.LIMIT_DATA;
@Component
public class LimitDataDaoImpl extends AbstractDao implements LimitDataDao {
@RequiredArgsConstructor
public class LimitDataDaoImpl implements LimitDataDao {
public LimitDataDaoImpl(DataSourceConnectionProvider dataSource) {
super(dataSource);
}
private final DSLContext dslContext;
@Override
public Long save(LimitData limitData) throws DaoException {
return getDslContext()
public Long save(LimitData limitData) {
return dslContext
.insertInto(LIMIT_DATA)
.set(getDslContext().newRecord(LIMIT_DATA, limitData))
.set(dslContext.newRecord(LIMIT_DATA, limitData))
.onConflict(LIMIT_DATA.NAME)
.doUpdate()
.set(LIMIT_DATA.WTIME, LocalDateTime.now())
@ -35,7 +33,7 @@ public class LimitDataDaoImpl extends AbstractDao implements LimitDataDao {
@Override
public LimitData get(String limitName) {
return getDslContext()
return dslContext
.selectFrom(LIMIT_DATA)
.where(LIMIT_DATA.NAME.equal(limitName))
.fetchOneInto(LimitData.class);
@ -43,7 +41,7 @@ public class LimitDataDaoImpl extends AbstractDao implements LimitDataDao {
@Override
public List<LimitData> get(Collection<String> limitNames) {
return getDslContext()
return dslContext
.selectFrom(LIMIT_DATA)
.where(LIMIT_DATA.NAME.in(limitNames))
.fetchInto(LimitData.class);

View File

@ -1,12 +1,12 @@
package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.AbstractDao;
import com.empayre.liminator.dao.OperationDao;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.domain.tables.pojos.Operation;
import com.empayre.liminator.exception.DaoException;
import com.empayre.liminator.model.LimitValue;
import org.jooq.impl.DataSourceConnectionProvider;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.Query;
import org.springframework.stereotype.Component;
import java.util.Collection;
@ -19,19 +19,18 @@ import static org.jooq.impl.DSL.raw;
import static org.jooq.impl.DSL.val;
@Component
public class OperationDaoImpl extends AbstractDao implements OperationDao {
@RequiredArgsConstructor
public class OperationDaoImpl implements OperationDao {
public OperationDaoImpl(DataSourceConnectionProvider dataSource) {
super(dataSource);
}
private final DSLContext dslContext;
private static final String DELIMITER = " ,";
@Override
public Long save(Operation operation) throws DaoException {
return getDslContext()
public Long save(Operation operation) {
return dslContext
.insertInto(OPERATION)
.set(getDslContext().newRecord(OPERATION, operation))
.set(dslContext.newRecord(OPERATION, operation))
.returning(OPERATION.ID)
.fetchOne()
.getId();
@ -39,7 +38,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
@Override
public Operation get(Long id) {
return getDslContext()
return dslContext
.selectFrom(OPERATION)
.where(OPERATION.ID.eq(id))
.fetchOneInto(Operation.class);
@ -47,7 +46,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
@Override
public List<Operation> get(String operationId, List<OperationState> states) {
return getDslContext()
return dslContext
.selectFrom(OPERATION)
.where(OPERATION.OPERATION_ID.eq(operationId))
.and(OPERATION.STATE.in(states))
@ -56,7 +55,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
@Override
public List<Operation> get(String operationId, Collection<Long> limitIds, List<OperationState> states) {
return getDslContext()
return dslContext
.selectFrom(OPERATION)
.where(OPERATION.OPERATION_ID.eq(operationId))
.and(OPERATION.LIMIT_ID.in(limitIds))
@ -66,11 +65,17 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
@Override
public void saveBatch(List<Operation> operations) {
var records = operations.stream()
.map(operation -> getDslContext().newRecord(OPERATION, operation))
.toList();
getDslContext()
.batchInsert(records)
var inserts = operations.stream()
.map(operation ->
dslContext
.insertInto(OPERATION)
.set(dslContext.newRecord(OPERATION, operation))
.onConflict(OPERATION.LIMIT_ID, OPERATION.OPERATION_ID)
.doNothing()
)
.toArray(Query[]::new);
dslContext
.batch(inserts)
.execute();
}
@ -85,12 +90,12 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
}
private int updateStateForHoldOperation(List<String> limitNames, String operationId, OperationState state) {
return getDslContext()
return dslContext
.update(OPERATION)
.set(OPERATION.STATE, state)
.where(OPERATION.OPERATION_ID.eq(operationId))
.and(OPERATION.LIMIT_ID.in(
getDslContext()
dslContext
.select(LIMIT_DATA.ID)
.from(LIMIT_DATA)
.where(LIMIT_DATA.NAME.in(limitNames))
@ -122,7 +127,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
from commit_data as cd
join hold_data as hd on cd.id = hd.id;
""";
return getDslContext()
return dslContext
.resultQuery(sql, raw(arrayToString(limitNames)))
.fetchInto(LimitValue.class);
}
@ -158,7 +163,7 @@ public class OperationDaoImpl extends AbstractDao implements OperationDao {
from commit_data as cd
join hold_data as hd on cd.id = hd.id;
""";
return getDslContext()
return dslContext
.resultQuery(sql, val(operationId), raw(arrayToString(limitNames)))
.fetchInto(LimitValue.class);
}

View File

@ -1,10 +1,9 @@
package com.empayre.liminator.dao.impl;
import com.empayre.liminator.dao.AbstractDao;
import com.empayre.liminator.dao.OperationStateHistoryDao;
import com.empayre.liminator.domain.tables.pojos.OperationStateHistory;
import com.empayre.liminator.exception.DaoException;
import org.jooq.impl.DataSourceConnectionProvider;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.springframework.stereotype.Component;
import java.util.List;
@ -12,17 +11,16 @@ import java.util.List;
import static com.empayre.liminator.domain.Tables.OPERATION_STATE_HISTORY;
@Component
public class OperationStateHistoryDaoImpl extends AbstractDao implements OperationStateHistoryDao {
@RequiredArgsConstructor
public class OperationStateHistoryDaoImpl implements OperationStateHistoryDao {
public OperationStateHistoryDaoImpl(DataSourceConnectionProvider dataSource) {
super(dataSource);
}
private final DSLContext dslContext;
@Override
public Long save(OperationStateHistory history) throws DaoException {
return getDslContext()
public Long save(OperationStateHistory history) {
return dslContext
.insertInto(OPERATION_STATE_HISTORY)
.set(getDslContext().newRecord(OPERATION_STATE_HISTORY, history))
.set(dslContext.newRecord(OPERATION_STATE_HISTORY, history))
.returning(OPERATION_STATE_HISTORY.ID)
.fetchOne()
.getId();
@ -31,9 +29,9 @@ public class OperationStateHistoryDaoImpl extends AbstractDao implements Operati
@Override
public void saveBatch(List<OperationStateHistory> historyList) {
var records = historyList.stream()
.map(history -> getDslContext().newRecord(OPERATION_STATE_HISTORY, history))
.map(history -> dslContext.newRecord(OPERATION_STATE_HISTORY, history))
.toList();
getDslContext()
dslContext
.batchInsert(records)
.execute();
}

View File

@ -1,23 +0,0 @@
package com.empayre.liminator.exception;
public class DaoException extends RuntimeException {
public DaoException() {
super();
}
public DaoException(String message) {
super(message);
}
public DaoException(String message, Throwable cause) {
super(message, cause);
}
public DaoException(Throwable cause) {
super(cause);
}
protected DaoException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -15,6 +15,7 @@ import dev.vality.liminator.OperationAlreadyInFinalState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -33,6 +34,9 @@ public class HoldOperationHandlerImpl implements HoldOperationHandler {
private final OperationConverter operationConverter;
private final LimitOperationsLoggingService limitOperationsLoggingService;
@Value("${service.skipExistedHoldOps}")
private boolean skipExistedHoldOps;
private static final String LOG_PREFIX = "HOLD";
@Transactional
@ -41,8 +45,10 @@ public class HoldOperationHandlerImpl implements HoldOperationHandler {
List<LimitData> limitData = limitDataGettingService.get(request, LOG_PREFIX);
Map<String, Long> limitNamesMap = LimitDataUtils.createLimitNamesMap(limitData);
checkExistedHoldOperations(limitNamesMap, request.getOperationId());
checkExistedFinalizeOperations(limitNamesMap, request.getOperationId());
if (!skipExistedHoldOps) {
checkExistedHoldOperations(limitNamesMap, request.getOperationId());
}
operationDao.saveBatch(convertToOperation(request, limitNamesMap));
limitOperationsLoggingService.writeOperations(request, OperationState.HOLD);

View File

@ -1,7 +1,6 @@
package com.empayre.liminator.service;
import com.empayre.liminator.domain.enums.OperationState;
import com.empayre.liminator.exception.DaoException;
import com.empayre.liminator.handler.FinalizeOperationHandler;
import com.empayre.liminator.handler.Handler;
import com.empayre.liminator.handler.HoldOperationHandler;
@ -9,6 +8,7 @@ import dev.vality.liminator.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.jooq.exception.DataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@ -65,7 +65,7 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
throws LimitNotFound, LimitsValuesReadingException, TException {
try {
return getLimitsValuesHandler.handle(request);
} catch (DaoException ex) {
} catch (DataAccessException ex) {
log.error("[GET] Received DaoException for getting limits operation (request: {})", request, ex);
throw new LimitsValuesReadingException();
}
@ -76,7 +76,7 @@ public class LiminatorService implements LiminatorServiceSrv.Iface {
throws LimitNotFound, LimitsValuesReadingException, TException {
try {
return getLastLimitsValuesHandler.handle(limitNames);
} catch (DaoException ex) {
} catch (DataAccessException ex) {
log.error("[GET] Received DaoException for getting last limits operation (limitNames: {})", limitNames, ex);
throw new LimitsValuesReadingException();
}

View File

@ -1,6 +1,3 @@
server:
port: "${server.port}"
spring:
application:
name: "${project.name}"
@ -20,11 +17,15 @@ spring:
jooq:
sql-dialect: Postgres
server:
port: "${server.port}"
service:
default:
limit: 20
logging:
enabled: true
skipExistedHoldOps: true
management:
server:
@ -49,33 +50,6 @@ management:
exposure:
include: health,info,prometheus
kafka:
consumer:
group-id: "Liminator-Listener"
party-management-concurrency: 7
wallet-concurrency: 7
identity-concurrency: 7
topics:
party-management:
id: mg-events-party
enabled: false
consumer.group-id: "LiminatorListenerPartyManagement"
identity:
id: mg-events-ff-identity
enabled: false
wallet:
id: mg-events-ff-wallet
enabled: false
cache:
party-shop:
size: 10000
expire:
after:
sec: 600
testcontainers:
postgresql:
tag: '11.4'
kafka:
tag: '6.2.0'