add CreateAdjustment schedulator (#7)

This commit is contained in:
Anatolii Karlov 2024-08-30 20:03:39 +07:00 committed by GitHub
parent 7e6c5f1ba6
commit d7a19835cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 305 additions and 142 deletions

View File

@ -63,9 +63,6 @@
Диспуты на проверку статуса упорядочиваются по последнему времени проверки поля `next_check_after`, берутся самые
древние
Опция `DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT` скажет сервису пропустить этап создания корректировки в хелгейте, в этом
случае должно произойти ручное внесенение корректировки
Ответственность за актуальный статус диспута несет конкретный адаптер. Решение остается за адаптером, тк детали
реализации могут отличаться в зависимости от интеграции, `disputes-api` работает уже с результатом этого процесса и
занимается обновлением статуса диспута в своей БД. При этом, при создании диспута необходимо в адаптере по возможности
@ -102,3 +99,37 @@
Вызов адаптеров — пулинг на падения не влияет, но при создании предполагается, что это ответственность адаптера
удостовериться, что такого диспута еще не существует.
# Модуль ручного разбора
схема такая:
после создания диспута через нашу внешнюю апишку, будет происходить несколько кейсов:
- если `CreatedDisputesService` при попытке создать диспут в провайдере понимает, что такой ручки в провайдере не
существут, отправляет на ручной разбор
- если не выставлен флаг в опциях `DISPUTE_FLOW_PROVIDERS_API_EXIST` , то тоже отправляет на ручной разбор
- если это captured платеж и выставлена опция `DISPUTE_FLOW_CAPTURED_BLOCKED` , то тоже отправляет на ручной разбор
Далее, через внутрений трифт-интерфейс саппорт получает способ манипулировать диспутом для его
обработки (`ManualParsingDisputesService`)
- Перед переводом диспута в финальный статус саппорт должен будет забиндить айди созданного диспута в провайдере через
ручку `BindCreated()`. Здесь особенность, что этот метод фильтрует возможность биндить диспуты только созданные
вручную (из `manual_parsing_created`)
Далее, в режиме ручного разбора есть опция финализации диспута в фейл (`CancelPending()`) либо в
успех (`ApprovePending()`). Здесь особенность, что в фейл можно перевести любой диспут имеющий не финальный статус, а в
успех можно перевести, только если гарантировано создан внешний диспут у провайдера (
из `pending`,`manual_parsing_binded_pending`)
- Из за того, что для ручных диспутов добавлены отдельные
статусы `manual_parsing_binded_pending` ,`manual_parsing_created` не происходит ситуации, что такие диспуты попадут в
таску `PendingDisputesService` которая автоматически вызывает апи провайдера для проверки статуса
# Схема аппрува корректировок
Добавить в БД для диспута колонку "IS_AUTO", по дефолту проставляем туда FALSE.
Получаем успешный статус от провайдера, пишем в БД статус READY_FOR_ADJUSTMENT.
Корректировки проводим в отдельном потоке по расписанию, как сейчас создаем и проводим диспуты (просто последний вариант
разбиваем на два отдельных). По расписанию стартует поток и автоматически проводит только те проводки, где IS_AUTO =
TRUE. В идеале саппорт/фины будут скриптом/в админке сначала проставлять TRUE.

View File

@ -47,7 +47,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>provider-disputes-proto</artifactId>
<version>1.14-55b451d</version>
<version>1.16-61eff7c</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>

View File

@ -23,7 +23,7 @@ public class Status200ResponseConverter {
private Status200Response.StatusEnum getStatus(Dispute dispute) {
return switch (dispute.getStatus()) {
case created, pending, manual_parsing_created, manual_parsing_binded_pending ->
case created, pending, manual_created, manual_pending, create_adjustment ->
Status200Response.StatusEnum.PENDING;
case succeeded -> Status200Response.StatusEnum.SUCCEEDED;
case cancelled, failed -> Status200Response.StatusEnum.FAILED;

View File

@ -23,8 +23,7 @@ import java.util.Set;
@SuppressWarnings({"ParameterName", "LineLength"})
public class ApiDisputesService {
public static final Set<DisputeStatus> DISPUTE_PENDING = Set.of(
DisputeStatus.created, DisputeStatus.pending, DisputeStatus.manual_parsing_created, DisputeStatus.manual_parsing_binded_pending);
public static final Set<DisputeStatus> DISPUTE_PENDING = pendings();
private final DisputeDao disputeDao;
private final ApiAttachmentsService apiAttachmentsService;
private final DisputeConverter disputeConverter;
@ -68,4 +67,13 @@ public class ApiDisputesService {
log.debug("Dispute has been found, disputeId={}", disputeId);
return dispute;
}
private static Set<DisputeStatus> pendings() {
return Set.of(
DisputeStatus.created,
DisputeStatus.pending,
DisputeStatus.manual_created,
DisputeStatus.manual_pending,
DisputeStatus.create_adjustment);
}
}

View File

@ -8,6 +8,5 @@ public class TerminalOptionsField {
public static final String DISPUTE_FLOW_MAX_TIME_POLLING_MIN = "DISPUTE_FLOW_MAX_TIME_POLLING_MIN";
public static final String DISPUTE_FLOW_CAPTURED_BLOCKED = "DISPUTE_FLOW_CAPTURED_BLOCKED";
public static final String DISPUTE_FLOW_PROVIDERS_API_EXIST = "DISPUTE_FLOW_PROVIDERS_API_EXIST";
public static final String DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT = "DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT";
}

View File

@ -84,6 +84,18 @@ public class DisputeDao extends AbstractGenericDao {
.orElse(List.of());
}
public List<Dispute> getDisputesForHgCall(int limit) {
var query = getDslContext().selectFrom(DISPUTE)
.where(DISPUTE.STATUS.eq(DisputeStatus.create_adjustment)
.and(DISPUTE.SKIP_CALL_HG_FOR_CREATE_ADJUSTMENT.eq(false)))
.orderBy(DISPUTE.NEXT_CHECK_AFTER)
.limit(limit)
.forUpdate()
.skipLocked();
return Optional.ofNullable(fetch(query, disputeRowMapper))
.orElse(List.of());
}
@Nullable
public Dispute getDisputeForUpdateSkipLocked(long disputeId) {
var query = getDslContext().selectFrom(DISPUTE)
@ -94,22 +106,26 @@ public class DisputeDao extends AbstractGenericDao {
}
public long update(long disputeId, DisputeStatus status) {
return update(disputeId, status, null, null, null);
return update(disputeId, status, null, null, null, null);
}
public long update(long disputeId, DisputeStatus status, LocalDateTime nextCheckAfter) {
return update(disputeId, status, nextCheckAfter, null, null);
return update(disputeId, status, nextCheckAfter, null, null, null);
}
public long update(long disputeId, DisputeStatus status, String errorMessage) {
return update(disputeId, status, null, errorMessage, null);
return update(disputeId, status, null, errorMessage, null, null);
}
public long update(long disputeId, DisputeStatus status, Long changedAmount) {
return update(disputeId, status, null, null, changedAmount);
return update(disputeId, status, null, null, changedAmount, null);
}
private long update(long disputeId, DisputeStatus status, LocalDateTime nextCheckAfter, String errorMessage, Long changedAmount) {
public long update(long disputeId, DisputeStatus status, Long changedAmount, Boolean skipCallHgForCreateAdjustment) {
return update(disputeId, status, null, null, changedAmount, skipCallHgForCreateAdjustment);
}
private long update(long disputeId, DisputeStatus status, LocalDateTime nextCheckAfter, String errorMessage, Long changedAmount, Boolean skipCallHgForCreateAdjustment) {
var set = getDslContext().update(DISPUTE)
.set(DISPUTE.STATUS, status);
if (nextCheckAfter != null) {
@ -121,6 +137,9 @@ public class DisputeDao extends AbstractGenericDao {
if (changedAmount != null) {
set = set.set(DISPUTE.CHANGED_AMOUNT, changedAmount);
}
if (skipCallHgForCreateAdjustment != null) {
set = set.set(DISPUTE.SKIP_CALL_HG_FOR_CREATE_ADJUSTMENT, skipCallHgForCreateAdjustment);
}
var query = set
.where(DISPUTE.ID.eq(disputeId));
executeOne(query);

View File

@ -1,5 +1,8 @@
package dev.vality.disputes.manualparsing;
import dev.vality.disputes.admin.ApproveParams;
import dev.vality.disputes.admin.BindParams;
import dev.vality.disputes.admin.CancelParams;
import dev.vality.disputes.dao.DisputeDao;
import dev.vality.disputes.dao.ProviderDisputeDao;
import dev.vality.disputes.domain.enums.DisputeStatus;
@ -23,7 +26,9 @@ public class ManualParsingDisputesService {
private final ProviderDisputeDao providerDisputeDao;
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
public void cancelPendingDispute(String disputeId, String cancelReason) {
public void cancelPendingDispute(CancelParams cancelParams) {
var disputeId = cancelParams.getDisputeId();
var cancelReason = cancelParams.getCancelReason().orElse(null);
log.debug("Trying to getForUpdateSkipLocked {}", disputeId);
var dispute = disputeDao.getForUpdateSkipLocked(Long.parseLong(disputeId));
log.debug("GetForUpdateSkipLocked has been found {}", dispute);
@ -38,32 +43,43 @@ public class ManualParsingDisputesService {
}
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
public void approvePendingDispute(String disputeId, long changedAmount) {
public void approvePendingDispute(ApproveParams approveParam) {
var disputeId = approveParam.getDisputeId();
log.debug("Trying to getForUpdateSkipLocked {}", disputeId);
var dispute = disputeDao.getForUpdateSkipLocked(Long.parseLong(disputeId));
log.debug("GetForUpdateSkipLocked has been found {}", dispute);
if (dispute.getStatus() == DisputeStatus.pending
|| dispute.getStatus() == DisputeStatus.manual_parsing_binded_pending) {
// переводим в успех только если диспут уже был создан на стороне провайдера
log.info("Trying to set succeeded Dispute status {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount);
log.debug("Dispute status has been set to succeeded {}", dispute);
} else {
log.info("Request was skipped by inappropriate status {}", dispute);
var skipCallHg = approveParam.isSkipCallHgForCreateAdjustment();
var targetStatus = skipCallHg ? DisputeStatus.succeeded : DisputeStatus.create_adjustment;
if (dispute.getStatus() == DisputeStatus.create_adjustment) {
log.info("Trying to set {} Dispute status {}", targetStatus, dispute);
// changedAmount не обновляем, тк уже заапрувлено на этапе чек статуса
disputeDao.update(dispute.getId(), targetStatus, null, skipCallHg);
log.debug("Dispute status has been set to {} {}", targetStatus, dispute);
return;
}
if (dispute.getStatus() == DisputeStatus.pending
|| dispute.getStatus() == DisputeStatus.manual_pending) {
log.info("Trying to set {} Dispute status {}", targetStatus, dispute);
disputeDao.update(dispute.getId(), targetStatus, approveParam.getChangedAmount().orElse(null), skipCallHg);
log.debug("Dispute status has been set to {} {}", targetStatus, dispute);
return;
}
log.info("Request was skipped by inappropriate status {}", dispute);
}
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
public void bindCreatedDispute(String disputeId, String providerDisputeId) {
public void bindCreatedDispute(BindParams bindParam) {
var disputeId = bindParam.getDisputeId();
var providerDisputeId = bindParam.getProviderDisputeId();
log.debug("Trying to getForUpdateSkipLocked {}", disputeId);
var dispute = disputeDao.getForUpdateSkipLocked(Long.parseLong(disputeId));
log.debug("GetForUpdateSkipLocked has been found {}", dispute);
if (dispute.getStatus() == DisputeStatus.manual_parsing_created) {
if (dispute.getStatus() == DisputeStatus.manual_created) {
// обрабатываем здесь только вручную созданные диспуты, у остальных предполагается,
// что providerDisputeId будет сохранен после создания диспута по API провайдера
log.info("Trying to set manual_parsing_binded_pending Dispute status {}", dispute);
providerDisputeDao.save(new ProviderDispute(providerDisputeId, dispute.getId()));
disputeDao.update(dispute.getId(), DisputeStatus.manual_parsing_binded_pending);
disputeDao.update(dispute.getId(), DisputeStatus.manual_pending);
log.debug("Dispute status has been set to manual_parsing_binded_pending {}", dispute);
} else {
log.info("Request was skipped by inappropriate status {}", dispute);

View File

@ -1,6 +1,6 @@
package dev.vality.disputes.manualparsing;
import dev.vality.disputes.ManualParsingServiceSrv;
import dev.vality.disputes.admin.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
@ -15,17 +15,23 @@ public class ManualParsingHandler implements ManualParsingServiceSrv.Iface {
private final ManualParsingDisputesService manualParsingDisputesService;
@Override
public void cancelPending(String disputeId, String cancelReason) throws TException {
manualParsingDisputesService.cancelPendingDispute(disputeId, cancelReason);
public void cancelPending(CancelParamsRequest cancelParamsRequest) throws TException {
for (var cancelParam : cancelParamsRequest.getCancelParams()) {
manualParsingDisputesService.cancelPendingDispute(cancelParam);
}
}
@Override
public void approvePending(String disputeId, long changedAmount) throws TException {
manualParsingDisputesService.approvePendingDispute(disputeId, changedAmount);
public void approvePending(ApproveParamsRequest approveParamsRequest) throws TException {
for (var approveParam : approveParamsRequest.getApproveParams()) {
manualParsingDisputesService.approvePendingDispute(approveParam);
}
}
@Override
public void bindCreated(String disputeId, String providerDisputeId) throws TException {
manualParsingDisputesService.bindCreatedDispute(disputeId, providerDisputeId);
public void bindCreated(BindParamsRequest bindParamsRequest) throws TException {
for (BindParams bindParam : bindParamsRequest.getBindParams()) {
manualParsingDisputesService.bindCreatedDispute(bindParam);
}
}
}

View File

@ -32,23 +32,9 @@ public class ManualParsingTopic {
contextMap.put("dispute_id", dispute.getId().toString());
var attachmentsCollect = attachments.stream().map(Attachment::toString).collect(Collectors.joining(", "));
contextMap.put("dispute_attachments", attachmentsCollect);
contextMap.put("dispute_status", DisputeStatus.manual_parsing_created.name());
contextMap.put("dispute_status", DisputeStatus.manual_created.name());
MDC.setContextMap(contextMap);
log.warn("Manual parsing case");
MDC.clear(); //?? будет ли это работать и откатит ли лог при откате транзакции
}
@Transactional(propagation = Propagation.REQUIRED)
public void sendSucceeded(Dispute dispute, Long changedAmount) {
if (!enabled) {
return;
}
var contextMap = MDC.getCopyOfContextMap();
contextMap.put("dispute_id", dispute.getId().toString());
contextMap.put("dispute_changed_amount", String.valueOf(changedAmount));
contextMap.put("dispute_status", DisputeStatus.succeeded.name());
MDC.setContextMap(contextMap);
log.warn("Manual parsing case");
MDC.clear(); //?? будет ли это работать и откатит ли лог при откате транзакции
MDC.clear();
}
}

View File

@ -0,0 +1,52 @@
package dev.vality.disputes.schedule;
import dev.vality.disputes.domain.tables.pojos.Dispute;
import dev.vality.disputes.schedule.handler.CreateAdjustmentHandler;
import dev.vality.disputes.schedule.service.CreateAdjustmentsService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class TaskCreateAdjustmentsService {
private final ExecutorService disputesThreadPool;
private final CreateAdjustmentsService createAdjustmentsService;
@Value("${dispute.batchSize}")
private int batchSize;
@Value("${dispute.isScheduleCreateAdjustmentsEnabled}")
private boolean isScheduleCreateAdjustmentsEnabled;
@Scheduled(fixedDelayString = "${dispute.fixedDelayCreateAdjustments}")
public void processPending() {
if (!isScheduleCreateAdjustmentsEnabled) {
return;
}
log.info("Processing create adjustments get started");
try {
var disputes = createAdjustmentsService.getDisputesForHgCall(batchSize);
var callables = disputes.stream()
.map(this::handleCreateAdjustment)
.collect(Collectors.toList());
disputesThreadPool.invokeAll(callables);
} catch (InterruptedException ex) {
log.error("Received InterruptedException while thread executed report", ex);
Thread.currentThread().interrupt();
} catch (Exception ex) {
log.error("Received exception while scheduler processed create adjustments", ex);
}
log.info("Create adjustments were processed");
}
private Callable<Long> handleCreateAdjustment(Dispute dispute) {
return () -> new CreateAdjustmentHandler(createAdjustmentsService).handle(dispute);
}
}

View File

@ -13,7 +13,7 @@ public class DisputeContextConverter {
public DisputeContext convert(Dispute dispute, ProviderDispute providerDispute, Map<String, String> options) {
var disputeContext = new DisputeContext();
disputeContext.setDisputeId(providerDispute.getProviderDisputeId());
disputeContext.setProviderDisputeId(providerDispute.getProviderDisputeId());
var currency = new Currency();
currency.setName(dispute.getCurrencyName());
currency.setSymbolicCode(dispute.getCurrencySymbolicCode());

View File

@ -3,7 +3,6 @@ package dev.vality.disputes.schedule.converter;
import dev.vality.damsel.domain.*;
import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentParams;
import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentScenario;
import dev.vality.disputes.DisputeStatusResult;
import dev.vality.disputes.domain.tables.pojos.Dispute;
import org.springframework.stereotype.Component;
@ -14,13 +13,13 @@ public class InvoicePaymentAdjustmentParamsConverter {
public static final String DISPUTE_MASK = "disputeId=%s";
public InvoicePaymentAdjustmentParams convert(Dispute dispute, DisputeStatusResult result) {
public InvoicePaymentAdjustmentParams convert(Dispute dispute) {
var captured = new InvoicePaymentCaptured();
var reason = getReason(dispute);
captured.setReason(reason);
var changedAmount = result.getStatusSuccess().getChangedAmount();
if (changedAmount.isPresent()) {
var cost = new Cash(changedAmount.get(), new CurrencyRef(dispute.getCurrencySymbolicCode()));
var changedAmount = dispute.getChangedAmount();
if (changedAmount != null) {
var cost = new Cash(changedAmount, new CurrencyRef(dispute.getCurrencySymbolicCode()));
captured.setCost(cost);
}
var params = new InvoicePaymentAdjustmentParams();

View File

@ -0,0 +1,23 @@
package dev.vality.disputes.schedule.handler;
import dev.vality.disputes.domain.tables.pojos.Dispute;
import dev.vality.disputes.schedule.service.CreateAdjustmentsService;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class CreateAdjustmentHandler {
private final CreateAdjustmentsService createAdjustmentsService;
public Long handle(Dispute dispute) {
final var currentThread = Thread.currentThread();
final var oldName = currentThread.getName();
currentThread.setName("dispute-create-adjustment-" + dispute.getId());
try {
createAdjustmentsService.callHgForCreateAdjustment(dispute);
return dispute.getId();
} finally {
currentThread.setName(oldName);
}
}
}

View File

@ -4,7 +4,6 @@ import dev.vality.damsel.domain.Cash;
import dev.vality.damsel.domain.InvoicePaymentAdjustment;
import dev.vality.damsel.domain.InvoicePaymentStatus;
import dev.vality.damsel.payment_processing.InvoicePayment;
import dev.vality.disputes.DisputeStatusResult;
import dev.vality.disputes.domain.tables.pojos.Dispute;
import jakarta.annotation.Nonnull;
import org.springframework.stereotype.Component;
@ -33,11 +32,11 @@ public class AdjustmentExtractor {
.findFirst());
}
public Long getChangedAmount(@Nonnull InvoicePaymentAdjustment invoicePaymentAdjustment, DisputeStatusResult result) {
public Long getChangedAmount(@Nonnull InvoicePaymentAdjustment invoicePaymentAdjustment, Long changedAmount) {
return Optional.of(invoicePaymentAdjustment)
.map(s -> getTargetStatus(s).getCaptured().getCost())
.map(Cash::getAmount)
.or(() -> result.getStatusSuccess().getChangedAmount())
.or(() -> Optional.ofNullable(changedAmount))
.orElse(null);
}

View File

@ -0,0 +1,95 @@
package dev.vality.disputes.schedule.service;
import dev.vality.damsel.domain.InvoicePaymentAdjustment;
import dev.vality.damsel.payment_processing.InvoicePayment;
import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentParams;
import dev.vality.disputes.constant.ErrorReason;
import dev.vality.disputes.dao.DisputeDao;
import dev.vality.disputes.domain.enums.DisputeStatus;
import dev.vality.disputes.domain.tables.pojos.Dispute;
import dev.vality.disputes.exception.InvoicingPaymentStatusPendingException;
import dev.vality.disputes.schedule.converter.InvoicePaymentAdjustmentParamsConverter;
import dev.vality.disputes.service.external.InvoicingService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
@SuppressWarnings({"ParameterName", "LineLength", "MissingSwitchDefault"})
public class CreateAdjustmentsService {
private final DisputeDao disputeDao;
private final InvoicingService invoicingService;
private final InvoicePaymentAdjustmentParamsConverter invoicePaymentAdjustmentParamsConverter;
private final AdjustmentExtractor adjustmentExtractor;
@Transactional(propagation = Propagation.REQUIRED)
public List<Dispute> getDisputesForHgCall(int batchSize) {
log.debug("Trying to getDisputesForHgCall");
var locked = disputeDao.getDisputesForHgCall(batchSize);
log.debug("getDisputesForHgCall has been found, size={}", locked.size());
return locked;
}
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
public void callHgForCreateAdjustment(Dispute dispute) {
log.debug("Trying to getDisputeForUpdateSkipLocked {}", dispute);
var forUpdate = disputeDao.getDisputeForUpdateSkipLocked(dispute.getId());
if (forUpdate == null || forUpdate.getStatus() != DisputeStatus.create_adjustment) {
log.debug("Dispute locked or wrong status {}", forUpdate);
return;
}
log.debug("GetDisputeForUpdateSkipLocked has been found {}", dispute);
var invoicePayment = getInvoicePayment(dispute);
if (invoicePayment == null || !invoicePayment.isSetRoute()) {
log.error("Trying to set failed Dispute status with PAYMENT_NOT_FOUND error reason {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.PAYMENT_NOT_FOUND);
log.debug("Dispute status has been set to failed {}", dispute);
return;
}
var invoicePaymentAdjustment = adjustmentExtractor.searchAdjustmentByDispute(invoicePayment, dispute);
if (invoicePaymentAdjustment.isPresent()) {
var changedAmount = adjustmentExtractor.getChangedAmount(invoicePaymentAdjustment.get(), dispute.getChangedAmount());
log.info("Trying to set succeeded Dispute status {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount);
log.debug("Dispute status has been set to succeeded {}", dispute);
return;
}
try {
var params = invoicePaymentAdjustmentParamsConverter.convert(dispute);
var paymentAdjustment = createAdjustment(dispute, params);
if (paymentAdjustment == null) {
log.error("Trying to set failed Dispute status with INVOICE_NOT_FOUND error reason {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.INVOICE_NOT_FOUND);
log.debug("Dispute status has been set to failed {}", dispute);
return;
}
} catch (InvoicingPaymentStatusPendingException e) {
// в теории 0%, что сюда попдает выполнение кода, но если попадет, то:
// платеж с не финальным статусом будет заблочен для создания корректировок на стороне хелгейта
// и тогда диспут будет пулиться, пока платеж не зафиналится,
// и тк никакой записи в коде выше нет, то пуллинг не проблема
// а запрос в checkDisputeStatus по идемпотентности просто вернет тот же success
log.error("Error when hg.createPaymentAdjustment() got payments status pending {}", dispute, e);
return;
}
log.info("Trying to set succeeded Dispute status {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.succeeded);
log.debug("Dispute status has been set to succeeded {}", dispute);
}
private InvoicePaymentAdjustment createAdjustment(Dispute dispute, InvoicePaymentAdjustmentParams params) {
return invoicingService.createPaymentAdjustment(dispute.getInvoiceId(), dispute.getPaymentId(), params);
}
private InvoicePayment getInvoicePayment(Dispute dispute) {
return invoicingService.getInvoicePayment(dispute.getInvoiceId(), dispute.getPaymentId());
}
}

View File

@ -113,7 +113,7 @@ public class CreatedDisputesService {
case SUCCESS_RESULT -> {
var nextCheckAfter = exponentialBackOffPollingService.prepareNextPollingInterval(dispute);
log.info("Trying to set pending Dispute status {}, {}", dispute, result);
providerDisputeDao.save(new ProviderDispute(result.getSuccessResult().getDisputeId(), dispute.getId()));
providerDisputeDao.save(new ProviderDispute(result.getSuccessResult().getProviderDisputeId(), dispute.getId()));
disputeDao.update(dispute.getId(), DisputeStatus.pending, nextCheckAfter);
log.debug("Dispute status has been set to pending {}", dispute);
}
@ -130,7 +130,7 @@ public class CreatedDisputesService {
void finishTaskWithManualParsingFlowActivation(Dispute dispute, List<Attachment> attachments) {
manualParsingTopic.sendCreated(dispute, attachments);
log.info("Trying to set manual_parsing_created Dispute status {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.manual_parsing_created);
disputeDao.update(dispute.getId(), DisputeStatus.manual_created);
log.debug("Dispute status has been set to manual_parsing_created {}", dispute);
}

View File

@ -1,27 +1,16 @@
package dev.vality.disputes.schedule.service;
import dev.vality.damsel.domain.InvoicePaymentAdjustment;
import dev.vality.damsel.domain.Terminal;
import dev.vality.damsel.domain.TerminalRef;
import dev.vality.damsel.payment_processing.InvoicePayment;
import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentParams;
import dev.vality.disputes.DisputeStatusResult;
import dev.vality.disputes.constant.ErrorReason;
import dev.vality.disputes.dao.DisputeDao;
import dev.vality.disputes.dao.ProviderDisputeDao;
import dev.vality.disputes.domain.enums.DisputeStatus;
import dev.vality.disputes.domain.tables.pojos.Dispute;
import dev.vality.disputes.exception.InvoicingPaymentStatusPendingException;
import dev.vality.disputes.manualparsing.ManualParsingTopic;
import dev.vality.disputes.polling.ExponentialBackOffPollingServiceWrapper;
import dev.vality.disputes.polling.PollingInfoService;
import dev.vality.disputes.schedule.client.RemoteClient;
import dev.vality.disputes.schedule.converter.InvoicePaymentAdjustmentParamsConverter;
import dev.vality.disputes.service.external.DominantService;
import dev.vality.disputes.service.external.InvoicingService;
import dev.vality.geck.serializer.kit.tbase.TErrorUtil;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
@ -29,9 +18,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static dev.vality.disputes.constant.TerminalOptionsField.DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT;
@Slf4j
@Service
@ -42,13 +28,8 @@ public class PendingDisputesService {
private final RemoteClient remoteClient;
private final DisputeDao disputeDao;
private final ProviderDisputeDao providerDisputeDao;
private final InvoicingService invoicingService;
private final DominantService dominantService;
private final PollingInfoService pollingInfoService;
private final InvoicePaymentAdjustmentParamsConverter invoicePaymentAdjustmentParamsConverter;
private final AdjustmentExtractor adjustmentExtractor;
private final ExponentialBackOffPollingServiceWrapper exponentialBackOffPollingService;
private final ManualParsingTopic manualParsingTopic;
@Transactional(propagation = Propagation.REQUIRED)
public List<Dispute> getPendingDisputesForUpdateSkipLocked(int batchSize) {
@ -92,48 +73,10 @@ public class PendingDisputesService {
void finishTask(Dispute dispute, DisputeStatusResult result) {
switch (result.getSetField()) {
case STATUS_SUCCESS -> {
var invoicePayment = getInvoicePayment(dispute);
if (invoicePayment == null || !invoicePayment.isSetRoute()) {
log.error("Trying to set failed Dispute status with PAYMENT_NOT_FOUND error reason {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.PAYMENT_NOT_FOUND);
log.debug("Dispute status has been set to failed {}", dispute);
return;
}
var invoicePaymentAdjustment = adjustmentExtractor.searchAdjustmentByDispute(invoicePayment, dispute);
if (invoicePaymentAdjustment.isPresent()) {
var changedAmount = adjustmentExtractor.getChangedAmount(invoicePaymentAdjustment.get(), result);
log.info("Trying to set succeeded Dispute status {}, {}", dispute, result);
disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount);
log.debug("Dispute status has been set to succeeded {}", dispute);
return;
}
var changedAmount = result.getStatusSuccess().getChangedAmount().orElse(null);
if (!isHgSkipCreateAdjustment(dispute)) {
try {
var params = invoicePaymentAdjustmentParamsConverter.convert(dispute, result);
var paymentAdjustment = createAdjustment(dispute, params);
if (paymentAdjustment == null) {
log.error("Trying to set failed Dispute status with INVOICE_NOT_FOUND error reason {}", dispute);
disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.INVOICE_NOT_FOUND);
log.debug("Dispute status has been set to failed {}", dispute);
return;
}
} catch (InvoicingPaymentStatusPendingException e) {
// в теории 0%, что сюда попдает выполнение кода, но если попадет, то:
// платеж с не финальным статусом будет заблочен для создания корректировок на стороне хелгейта
// и тогда диспут будет пулиться, пока платеж не зафиналится,
// и тк никакой записи в коде выше нет, то пуллинг не проблема
// а запрос в checkDisputeStatus по идемпотентности просто вернет тот же success
log.error("Error when hg.createPaymentAdjustment() {}", dispute, e);
return;
}
}
log.info("Trying to set succeeded Dispute status {}, {}", dispute, result);
disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount);
log.debug("Dispute status has been set to succeeded {}", dispute);
if (isHgSkipCreateAdjustment(dispute)) {
manualParsingTopic.sendSucceeded(dispute, changedAmount);
}
log.info("Trying to set create_adjustment Dispute status {}, {}", dispute, result);
disputeDao.update(dispute.getId(), DisputeStatus.create_adjustment, changedAmount);
log.debug("Dispute status has been set to create_adjustment {}", dispute);
}
case STATUS_FAIL -> {
var errorMessage = TErrorUtil.toStringVal(result.getStatusFail().getFailure());
@ -152,22 +95,4 @@ public class PendingDisputesService {
}
}
}
@SneakyThrows
private boolean isHgSkipCreateAdjustment(Dispute dispute) {
return getTerminal(dispute.getTerminalId()).get().getOptions()
.containsKey(DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT);
}
private CompletableFuture<Terminal> getTerminal(Integer terminalId) {
return dominantService.getTerminal(new TerminalRef(terminalId));
}
private InvoicePaymentAdjustment createAdjustment(Dispute dispute, InvoicePaymentAdjustmentParams params) {
return invoicingService.createPaymentAdjustment(dispute.getInvoiceId(), dispute.getPaymentId(), params);
}
private InvoicePayment getInvoicePayment(Dispute dispute) {
return invoicingService.getInvoicePayment(dispute.getInvoiceId(), dispute.getPaymentId());
}
}

View File

@ -1,6 +1,6 @@
package dev.vality.disputes.servlet;
import dev.vality.disputes.ManualParsingServiceSrv;
import dev.vality.disputes.admin.ManualParsingServiceSrv;
import dev.vality.woody.thrift.impl.http.THServiceBuilder;
import jakarta.servlet.*;
import jakarta.servlet.annotation.WebServlet;

View File

@ -93,8 +93,10 @@ dispute:
batchSize: 1
fixedDelayCreated: 5000
fixedDelayPending: 5000
fixedDelayCreateAdjustments: 5000
isScheduleCreatedEnabled: true
isSchedulePendingEnabled: true
isScheduleCreateAdjustmentsEnabled: true
time:
config:

View File

@ -1,3 +0,0 @@
alter type dspt.dispute_status add value 'cancelled';
alter type dspt.dispute_status add value 'manual_parsing_created';
alter type dspt.dispute_status add value 'manual_parsing_binded_pending';

View File

@ -0,0 +1,4 @@
ALTER TYPE dspt.dispute_status ADD VALUE 'cancelled';
ALTER TYPE dspt.dispute_status ADD VALUE 'manual_created';
ALTER TYPE dspt.dispute_status ADD VALUE 'manual_pending';
ALTER TYPE dspt.dispute_status ADD VALUE 'create_adjustment';

View File

@ -0,0 +1,2 @@
ALTER TABLE dspt.dispute
ADD COLUMN "skip_call_hg_for_create_adjustment" BOOLEAN NOT NULL DEFAULT TRUE;