From d7a19835cf5d9b47be1d1668f8bec27719fbb808 Mon Sep 17 00:00:00 2001 From: Anatolii Karlov Date: Fri, 30 Aug 2024 20:03:39 +0700 Subject: [PATCH] add CreateAdjustment schedulator (#7) --- README.md | 37 +++++++- pom.xml | 2 +- .../converter/Status200ResponseConverter.java | 2 +- .../api/service/ApiDisputesService.java | 12 ++- .../constant/TerminalOptionsField.java | 1 - .../dev/vality/disputes/dao/DisputeDao.java | 29 +++++- .../ManualParsingDisputesService.java | 42 +++++--- .../manualparsing/ManualParsingHandler.java | 20 ++-- .../manualparsing/ManualParsingTopic.java | 18 +--- .../TaskCreateAdjustmentsService.java | 52 ++++++++++ .../converter/DisputeContextConverter.java | 2 +- ...voicePaymentAdjustmentParamsConverter.java | 9 +- .../handler/CreateAdjustmentHandler.java | 23 +++++ .../schedule/service/AdjustmentExtractor.java | 5 +- .../service/CreateAdjustmentsService.java | 95 +++++++++++++++++++ .../service/CreatedDisputesService.java | 4 +- .../service/PendingDisputesService.java | 81 +--------------- .../servlet/ManualParsingServlet.java | 2 +- src/main/resources/application.yml | 2 + .../db/migration/V2__add_cancelled_type.sql | 3 - .../db/migration/V2__add_dispute_statuses.sql | 4 + .../db/migration/V3__add_skip_hg_flag.sql | 2 + 22 files changed, 305 insertions(+), 142 deletions(-) create mode 100644 src/main/java/dev/vality/disputes/schedule/TaskCreateAdjustmentsService.java create mode 100644 src/main/java/dev/vality/disputes/schedule/handler/CreateAdjustmentHandler.java create mode 100644 src/main/java/dev/vality/disputes/schedule/service/CreateAdjustmentsService.java delete mode 100644 src/main/resources/db/migration/V2__add_cancelled_type.sql create mode 100644 src/main/resources/db/migration/V2__add_dispute_statuses.sql create mode 100644 src/main/resources/db/migration/V3__add_skip_hg_flag.sql diff --git a/README.md b/README.md index 74ccf0b..f7177ed 100644 --- a/README.md +++ b/README.md @@ -63,9 +63,6 @@ Диспуты на проверку статуса упорядочиваются по последнему времени проверки поля `next_check_after`, берутся самые древние -Опция `DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT` скажет сервису пропустить этап создания корректировки в хелгейте, в этом -случае должно произойти ручное внесенение корректировки - Ответственность за актуальный статус диспута несет конкретный адаптер. Решение остается за адаптером, тк детали реализации могут отличаться в зависимости от интеграции, `disputes-api` работает уже с результатом этого процесса и занимается обновлением статуса диспута в своей БД. При этом, при создании диспута необходимо в адаптере по возможности @@ -102,3 +99,37 @@ Вызов адаптеров — пулинг на падения не влияет, но при создании предполагается, что это ответственность адаптера удостовериться, что такого диспута еще не существует. + +# Модуль ручного разбора + +схема такая: +после создания диспута через нашу внешнюю апишку, будет происходить несколько кейсов: + +- если `CreatedDisputesService` при попытке создать диспут в провайдере понимает, что такой ручки в провайдере не + существут, отправляет на ручной разбор +- если не выставлен флаг в опциях `DISPUTE_FLOW_PROVIDERS_API_EXIST` , то тоже отправляет на ручной разбор +- если это captured платеж и выставлена опция `DISPUTE_FLOW_CAPTURED_BLOCKED` , то тоже отправляет на ручной разбор + +Далее, через внутрений трифт-интерфейс саппорт получает способ манипулировать диспутом для его +обработки (`ManualParsingDisputesService`) + +- Перед переводом диспута в финальный статус саппорт должен будет забиндить айди созданного диспута в провайдере через + ручку `BindCreated()`. Здесь особенность, что этот метод фильтрует возможность биндить диспуты только созданные + вручную (из `manual_parsing_created`) + +Далее, в режиме ручного разбора есть опция финализации диспута в фейл (`CancelPending()`) либо в +успех (`ApprovePending()`). Здесь особенность, что в фейл можно перевести любой диспут имеющий не финальный статус, а в +успех можно перевести, только если гарантировано создан внешний диспут у провайдера ( +из `pending`,`manual_parsing_binded_pending`) + +- Из за того, что для ручных диспутов добавлены отдельные + статусы `manual_parsing_binded_pending` ,`manual_parsing_created` не происходит ситуации, что такие диспуты попадут в + таску `PendingDisputesService` которая автоматически вызывает апи провайдера для проверки статуса + +# Схема аппрува корректировок + +Добавить в БД для диспута колонку "IS_AUTO", по дефолту проставляем туда FALSE. +Получаем успешный статус от провайдера, пишем в БД статус READY_FOR_ADJUSTMENT. +Корректировки проводим в отдельном потоке по расписанию, как сейчас создаем и проводим диспуты (просто последний вариант +разбиваем на два отдельных). По расписанию стартует поток и автоматически проводит только те проводки, где IS_AUTO = +TRUE. В идеале саппорт/фины будут скриптом/в админке сначала проставлять TRUE. diff --git a/pom.xml b/pom.xml index ed0307e..677b135 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ dev.vality provider-disputes-proto - 1.14-55b451d + 1.16-61eff7c dev.vality diff --git a/src/main/java/dev/vality/disputes/api/converter/Status200ResponseConverter.java b/src/main/java/dev/vality/disputes/api/converter/Status200ResponseConverter.java index deeb7a2..5eb0754 100644 --- a/src/main/java/dev/vality/disputes/api/converter/Status200ResponseConverter.java +++ b/src/main/java/dev/vality/disputes/api/converter/Status200ResponseConverter.java @@ -23,7 +23,7 @@ public class Status200ResponseConverter { private Status200Response.StatusEnum getStatus(Dispute dispute) { return switch (dispute.getStatus()) { - case created, pending, manual_parsing_created, manual_parsing_binded_pending -> + case created, pending, manual_created, manual_pending, create_adjustment -> Status200Response.StatusEnum.PENDING; case succeeded -> Status200Response.StatusEnum.SUCCEEDED; case cancelled, failed -> Status200Response.StatusEnum.FAILED; diff --git a/src/main/java/dev/vality/disputes/api/service/ApiDisputesService.java b/src/main/java/dev/vality/disputes/api/service/ApiDisputesService.java index 66722be..68c72a4 100644 --- a/src/main/java/dev/vality/disputes/api/service/ApiDisputesService.java +++ b/src/main/java/dev/vality/disputes/api/service/ApiDisputesService.java @@ -23,8 +23,7 @@ import java.util.Set; @SuppressWarnings({"ParameterName", "LineLength"}) public class ApiDisputesService { - public static final Set DISPUTE_PENDING = Set.of( - DisputeStatus.created, DisputeStatus.pending, DisputeStatus.manual_parsing_created, DisputeStatus.manual_parsing_binded_pending); + public static final Set DISPUTE_PENDING = pendings(); private final DisputeDao disputeDao; private final ApiAttachmentsService apiAttachmentsService; private final DisputeConverter disputeConverter; @@ -68,4 +67,13 @@ public class ApiDisputesService { log.debug("Dispute has been found, disputeId={}", disputeId); return dispute; } + + private static Set pendings() { + return Set.of( + DisputeStatus.created, + DisputeStatus.pending, + DisputeStatus.manual_created, + DisputeStatus.manual_pending, + DisputeStatus.create_adjustment); + } } diff --git a/src/main/java/dev/vality/disputes/constant/TerminalOptionsField.java b/src/main/java/dev/vality/disputes/constant/TerminalOptionsField.java index c71745c..680b435 100644 --- a/src/main/java/dev/vality/disputes/constant/TerminalOptionsField.java +++ b/src/main/java/dev/vality/disputes/constant/TerminalOptionsField.java @@ -8,6 +8,5 @@ public class TerminalOptionsField { public static final String DISPUTE_FLOW_MAX_TIME_POLLING_MIN = "DISPUTE_FLOW_MAX_TIME_POLLING_MIN"; public static final String DISPUTE_FLOW_CAPTURED_BLOCKED = "DISPUTE_FLOW_CAPTURED_BLOCKED"; public static final String DISPUTE_FLOW_PROVIDERS_API_EXIST = "DISPUTE_FLOW_PROVIDERS_API_EXIST"; - public static final String DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT = "DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT"; } diff --git a/src/main/java/dev/vality/disputes/dao/DisputeDao.java b/src/main/java/dev/vality/disputes/dao/DisputeDao.java index aac154d..fa1a39e 100644 --- a/src/main/java/dev/vality/disputes/dao/DisputeDao.java +++ b/src/main/java/dev/vality/disputes/dao/DisputeDao.java @@ -84,6 +84,18 @@ public class DisputeDao extends AbstractGenericDao { .orElse(List.of()); } + public List getDisputesForHgCall(int limit) { + var query = getDslContext().selectFrom(DISPUTE) + .where(DISPUTE.STATUS.eq(DisputeStatus.create_adjustment) + .and(DISPUTE.SKIP_CALL_HG_FOR_CREATE_ADJUSTMENT.eq(false))) + .orderBy(DISPUTE.NEXT_CHECK_AFTER) + .limit(limit) + .forUpdate() + .skipLocked(); + return Optional.ofNullable(fetch(query, disputeRowMapper)) + .orElse(List.of()); + } + @Nullable public Dispute getDisputeForUpdateSkipLocked(long disputeId) { var query = getDslContext().selectFrom(DISPUTE) @@ -94,22 +106,26 @@ public class DisputeDao extends AbstractGenericDao { } public long update(long disputeId, DisputeStatus status) { - return update(disputeId, status, null, null, null); + return update(disputeId, status, null, null, null, null); } public long update(long disputeId, DisputeStatus status, LocalDateTime nextCheckAfter) { - return update(disputeId, status, nextCheckAfter, null, null); + return update(disputeId, status, nextCheckAfter, null, null, null); } public long update(long disputeId, DisputeStatus status, String errorMessage) { - return update(disputeId, status, null, errorMessage, null); + return update(disputeId, status, null, errorMessage, null, null); } public long update(long disputeId, DisputeStatus status, Long changedAmount) { - return update(disputeId, status, null, null, changedAmount); + return update(disputeId, status, null, null, changedAmount, null); } - private long update(long disputeId, DisputeStatus status, LocalDateTime nextCheckAfter, String errorMessage, Long changedAmount) { + public long update(long disputeId, DisputeStatus status, Long changedAmount, Boolean skipCallHgForCreateAdjustment) { + return update(disputeId, status, null, null, changedAmount, skipCallHgForCreateAdjustment); + } + + private long update(long disputeId, DisputeStatus status, LocalDateTime nextCheckAfter, String errorMessage, Long changedAmount, Boolean skipCallHgForCreateAdjustment) { var set = getDslContext().update(DISPUTE) .set(DISPUTE.STATUS, status); if (nextCheckAfter != null) { @@ -121,6 +137,9 @@ public class DisputeDao extends AbstractGenericDao { if (changedAmount != null) { set = set.set(DISPUTE.CHANGED_AMOUNT, changedAmount); } + if (skipCallHgForCreateAdjustment != null) { + set = set.set(DISPUTE.SKIP_CALL_HG_FOR_CREATE_ADJUSTMENT, skipCallHgForCreateAdjustment); + } var query = set .where(DISPUTE.ID.eq(disputeId)); executeOne(query); diff --git a/src/main/java/dev/vality/disputes/manualparsing/ManualParsingDisputesService.java b/src/main/java/dev/vality/disputes/manualparsing/ManualParsingDisputesService.java index 96c97ef..7421072 100644 --- a/src/main/java/dev/vality/disputes/manualparsing/ManualParsingDisputesService.java +++ b/src/main/java/dev/vality/disputes/manualparsing/ManualParsingDisputesService.java @@ -1,5 +1,8 @@ package dev.vality.disputes.manualparsing; +import dev.vality.disputes.admin.ApproveParams; +import dev.vality.disputes.admin.BindParams; +import dev.vality.disputes.admin.CancelParams; import dev.vality.disputes.dao.DisputeDao; import dev.vality.disputes.dao.ProviderDisputeDao; import dev.vality.disputes.domain.enums.DisputeStatus; @@ -23,7 +26,9 @@ public class ManualParsingDisputesService { private final ProviderDisputeDao providerDisputeDao; @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ) - public void cancelPendingDispute(String disputeId, String cancelReason) { + public void cancelPendingDispute(CancelParams cancelParams) { + var disputeId = cancelParams.getDisputeId(); + var cancelReason = cancelParams.getCancelReason().orElse(null); log.debug("Trying to getForUpdateSkipLocked {}", disputeId); var dispute = disputeDao.getForUpdateSkipLocked(Long.parseLong(disputeId)); log.debug("GetForUpdateSkipLocked has been found {}", dispute); @@ -38,32 +43,43 @@ public class ManualParsingDisputesService { } @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ) - public void approvePendingDispute(String disputeId, long changedAmount) { + public void approvePendingDispute(ApproveParams approveParam) { + var disputeId = approveParam.getDisputeId(); log.debug("Trying to getForUpdateSkipLocked {}", disputeId); var dispute = disputeDao.getForUpdateSkipLocked(Long.parseLong(disputeId)); log.debug("GetForUpdateSkipLocked has been found {}", dispute); - if (dispute.getStatus() == DisputeStatus.pending - || dispute.getStatus() == DisputeStatus.manual_parsing_binded_pending) { - // переводим в успех только если диспут уже был создан на стороне провайдера - log.info("Trying to set succeeded Dispute status {}", dispute); - disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount); - log.debug("Dispute status has been set to succeeded {}", dispute); - } else { - log.info("Request was skipped by inappropriate status {}", dispute); + var skipCallHg = approveParam.isSkipCallHgForCreateAdjustment(); + var targetStatus = skipCallHg ? DisputeStatus.succeeded : DisputeStatus.create_adjustment; + if (dispute.getStatus() == DisputeStatus.create_adjustment) { + log.info("Trying to set {} Dispute status {}", targetStatus, dispute); + // changedAmount не обновляем, тк уже заапрувлено на этапе чек статуса + disputeDao.update(dispute.getId(), targetStatus, null, skipCallHg); + log.debug("Dispute status has been set to {} {}", targetStatus, dispute); + return; } + if (dispute.getStatus() == DisputeStatus.pending + || dispute.getStatus() == DisputeStatus.manual_pending) { + log.info("Trying to set {} Dispute status {}", targetStatus, dispute); + disputeDao.update(dispute.getId(), targetStatus, approveParam.getChangedAmount().orElse(null), skipCallHg); + log.debug("Dispute status has been set to {} {}", targetStatus, dispute); + return; + } + log.info("Request was skipped by inappropriate status {}", dispute); } @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ) - public void bindCreatedDispute(String disputeId, String providerDisputeId) { + public void bindCreatedDispute(BindParams bindParam) { + var disputeId = bindParam.getDisputeId(); + var providerDisputeId = bindParam.getProviderDisputeId(); log.debug("Trying to getForUpdateSkipLocked {}", disputeId); var dispute = disputeDao.getForUpdateSkipLocked(Long.parseLong(disputeId)); log.debug("GetForUpdateSkipLocked has been found {}", dispute); - if (dispute.getStatus() == DisputeStatus.manual_parsing_created) { + if (dispute.getStatus() == DisputeStatus.manual_created) { // обрабатываем здесь только вручную созданные диспуты, у остальных предполагается, // что providerDisputeId будет сохранен после создания диспута по API провайдера log.info("Trying to set manual_parsing_binded_pending Dispute status {}", dispute); providerDisputeDao.save(new ProviderDispute(providerDisputeId, dispute.getId())); - disputeDao.update(dispute.getId(), DisputeStatus.manual_parsing_binded_pending); + disputeDao.update(dispute.getId(), DisputeStatus.manual_pending); log.debug("Dispute status has been set to manual_parsing_binded_pending {}", dispute); } else { log.info("Request was skipped by inappropriate status {}", dispute); diff --git a/src/main/java/dev/vality/disputes/manualparsing/ManualParsingHandler.java b/src/main/java/dev/vality/disputes/manualparsing/ManualParsingHandler.java index 36ac436..c6d58a5 100644 --- a/src/main/java/dev/vality/disputes/manualparsing/ManualParsingHandler.java +++ b/src/main/java/dev/vality/disputes/manualparsing/ManualParsingHandler.java @@ -1,6 +1,6 @@ package dev.vality.disputes.manualparsing; -import dev.vality.disputes.ManualParsingServiceSrv; +import dev.vality.disputes.admin.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.thrift.TException; @@ -15,17 +15,23 @@ public class ManualParsingHandler implements ManualParsingServiceSrv.Iface { private final ManualParsingDisputesService manualParsingDisputesService; @Override - public void cancelPending(String disputeId, String cancelReason) throws TException { - manualParsingDisputesService.cancelPendingDispute(disputeId, cancelReason); + public void cancelPending(CancelParamsRequest cancelParamsRequest) throws TException { + for (var cancelParam : cancelParamsRequest.getCancelParams()) { + manualParsingDisputesService.cancelPendingDispute(cancelParam); + } } @Override - public void approvePending(String disputeId, long changedAmount) throws TException { - manualParsingDisputesService.approvePendingDispute(disputeId, changedAmount); + public void approvePending(ApproveParamsRequest approveParamsRequest) throws TException { + for (var approveParam : approveParamsRequest.getApproveParams()) { + manualParsingDisputesService.approvePendingDispute(approveParam); + } } @Override - public void bindCreated(String disputeId, String providerDisputeId) throws TException { - manualParsingDisputesService.bindCreatedDispute(disputeId, providerDisputeId); + public void bindCreated(BindParamsRequest bindParamsRequest) throws TException { + for (BindParams bindParam : bindParamsRequest.getBindParams()) { + manualParsingDisputesService.bindCreatedDispute(bindParam); + } } } diff --git a/src/main/java/dev/vality/disputes/manualparsing/ManualParsingTopic.java b/src/main/java/dev/vality/disputes/manualparsing/ManualParsingTopic.java index 911fd46..b17b313 100644 --- a/src/main/java/dev/vality/disputes/manualparsing/ManualParsingTopic.java +++ b/src/main/java/dev/vality/disputes/manualparsing/ManualParsingTopic.java @@ -32,23 +32,9 @@ public class ManualParsingTopic { contextMap.put("dispute_id", dispute.getId().toString()); var attachmentsCollect = attachments.stream().map(Attachment::toString).collect(Collectors.joining(", ")); contextMap.put("dispute_attachments", attachmentsCollect); - contextMap.put("dispute_status", DisputeStatus.manual_parsing_created.name()); + contextMap.put("dispute_status", DisputeStatus.manual_created.name()); MDC.setContextMap(contextMap); log.warn("Manual parsing case"); - MDC.clear(); //?? будет ли это работать и откатит ли лог при откате транзакции - } - - @Transactional(propagation = Propagation.REQUIRED) - public void sendSucceeded(Dispute dispute, Long changedAmount) { - if (!enabled) { - return; - } - var contextMap = MDC.getCopyOfContextMap(); - contextMap.put("dispute_id", dispute.getId().toString()); - contextMap.put("dispute_changed_amount", String.valueOf(changedAmount)); - contextMap.put("dispute_status", DisputeStatus.succeeded.name()); - MDC.setContextMap(contextMap); - log.warn("Manual parsing case"); - MDC.clear(); //?? будет ли это работать и откатит ли лог при откате транзакции + MDC.clear(); } } diff --git a/src/main/java/dev/vality/disputes/schedule/TaskCreateAdjustmentsService.java b/src/main/java/dev/vality/disputes/schedule/TaskCreateAdjustmentsService.java new file mode 100644 index 0000000..73de3f7 --- /dev/null +++ b/src/main/java/dev/vality/disputes/schedule/TaskCreateAdjustmentsService.java @@ -0,0 +1,52 @@ +package dev.vality.disputes.schedule; + +import dev.vality.disputes.domain.tables.pojos.Dispute; +import dev.vality.disputes.schedule.handler.CreateAdjustmentHandler; +import dev.vality.disputes.schedule.service.CreateAdjustmentsService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TaskCreateAdjustmentsService { + + private final ExecutorService disputesThreadPool; + private final CreateAdjustmentsService createAdjustmentsService; + @Value("${dispute.batchSize}") + private int batchSize; + @Value("${dispute.isScheduleCreateAdjustmentsEnabled}") + private boolean isScheduleCreateAdjustmentsEnabled; + + @Scheduled(fixedDelayString = "${dispute.fixedDelayCreateAdjustments}") + public void processPending() { + if (!isScheduleCreateAdjustmentsEnabled) { + return; + } + log.info("Processing create adjustments get started"); + try { + var disputes = createAdjustmentsService.getDisputesForHgCall(batchSize); + var callables = disputes.stream() + .map(this::handleCreateAdjustment) + .collect(Collectors.toList()); + disputesThreadPool.invokeAll(callables); + } catch (InterruptedException ex) { + log.error("Received InterruptedException while thread executed report", ex); + Thread.currentThread().interrupt(); + } catch (Exception ex) { + log.error("Received exception while scheduler processed create adjustments", ex); + } + log.info("Create adjustments were processed"); + } + + private Callable handleCreateAdjustment(Dispute dispute) { + return () -> new CreateAdjustmentHandler(createAdjustmentsService).handle(dispute); + } +} diff --git a/src/main/java/dev/vality/disputes/schedule/converter/DisputeContextConverter.java b/src/main/java/dev/vality/disputes/schedule/converter/DisputeContextConverter.java index c52f98e..15d2740 100644 --- a/src/main/java/dev/vality/disputes/schedule/converter/DisputeContextConverter.java +++ b/src/main/java/dev/vality/disputes/schedule/converter/DisputeContextConverter.java @@ -13,7 +13,7 @@ public class DisputeContextConverter { public DisputeContext convert(Dispute dispute, ProviderDispute providerDispute, Map options) { var disputeContext = new DisputeContext(); - disputeContext.setDisputeId(providerDispute.getProviderDisputeId()); + disputeContext.setProviderDisputeId(providerDispute.getProviderDisputeId()); var currency = new Currency(); currency.setName(dispute.getCurrencyName()); currency.setSymbolicCode(dispute.getCurrencySymbolicCode()); diff --git a/src/main/java/dev/vality/disputes/schedule/converter/InvoicePaymentAdjustmentParamsConverter.java b/src/main/java/dev/vality/disputes/schedule/converter/InvoicePaymentAdjustmentParamsConverter.java index 51964e1..cdaed5e 100644 --- a/src/main/java/dev/vality/disputes/schedule/converter/InvoicePaymentAdjustmentParamsConverter.java +++ b/src/main/java/dev/vality/disputes/schedule/converter/InvoicePaymentAdjustmentParamsConverter.java @@ -3,7 +3,6 @@ package dev.vality.disputes.schedule.converter; import dev.vality.damsel.domain.*; import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentParams; import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentScenario; -import dev.vality.disputes.DisputeStatusResult; import dev.vality.disputes.domain.tables.pojos.Dispute; import org.springframework.stereotype.Component; @@ -14,13 +13,13 @@ public class InvoicePaymentAdjustmentParamsConverter { public static final String DISPUTE_MASK = "disputeId=%s"; - public InvoicePaymentAdjustmentParams convert(Dispute dispute, DisputeStatusResult result) { + public InvoicePaymentAdjustmentParams convert(Dispute dispute) { var captured = new InvoicePaymentCaptured(); var reason = getReason(dispute); captured.setReason(reason); - var changedAmount = result.getStatusSuccess().getChangedAmount(); - if (changedAmount.isPresent()) { - var cost = new Cash(changedAmount.get(), new CurrencyRef(dispute.getCurrencySymbolicCode())); + var changedAmount = dispute.getChangedAmount(); + if (changedAmount != null) { + var cost = new Cash(changedAmount, new CurrencyRef(dispute.getCurrencySymbolicCode())); captured.setCost(cost); } var params = new InvoicePaymentAdjustmentParams(); diff --git a/src/main/java/dev/vality/disputes/schedule/handler/CreateAdjustmentHandler.java b/src/main/java/dev/vality/disputes/schedule/handler/CreateAdjustmentHandler.java new file mode 100644 index 0000000..84842c2 --- /dev/null +++ b/src/main/java/dev/vality/disputes/schedule/handler/CreateAdjustmentHandler.java @@ -0,0 +1,23 @@ +package dev.vality.disputes.schedule.handler; + +import dev.vality.disputes.domain.tables.pojos.Dispute; +import dev.vality.disputes.schedule.service.CreateAdjustmentsService; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class CreateAdjustmentHandler { + + private final CreateAdjustmentsService createAdjustmentsService; + + public Long handle(Dispute dispute) { + final var currentThread = Thread.currentThread(); + final var oldName = currentThread.getName(); + currentThread.setName("dispute-create-adjustment-" + dispute.getId()); + try { + createAdjustmentsService.callHgForCreateAdjustment(dispute); + return dispute.getId(); + } finally { + currentThread.setName(oldName); + } + } +} diff --git a/src/main/java/dev/vality/disputes/schedule/service/AdjustmentExtractor.java b/src/main/java/dev/vality/disputes/schedule/service/AdjustmentExtractor.java index 46bfea5..bee9edd 100644 --- a/src/main/java/dev/vality/disputes/schedule/service/AdjustmentExtractor.java +++ b/src/main/java/dev/vality/disputes/schedule/service/AdjustmentExtractor.java @@ -4,7 +4,6 @@ import dev.vality.damsel.domain.Cash; import dev.vality.damsel.domain.InvoicePaymentAdjustment; import dev.vality.damsel.domain.InvoicePaymentStatus; import dev.vality.damsel.payment_processing.InvoicePayment; -import dev.vality.disputes.DisputeStatusResult; import dev.vality.disputes.domain.tables.pojos.Dispute; import jakarta.annotation.Nonnull; import org.springframework.stereotype.Component; @@ -33,11 +32,11 @@ public class AdjustmentExtractor { .findFirst()); } - public Long getChangedAmount(@Nonnull InvoicePaymentAdjustment invoicePaymentAdjustment, DisputeStatusResult result) { + public Long getChangedAmount(@Nonnull InvoicePaymentAdjustment invoicePaymentAdjustment, Long changedAmount) { return Optional.of(invoicePaymentAdjustment) .map(s -> getTargetStatus(s).getCaptured().getCost()) .map(Cash::getAmount) - .or(() -> result.getStatusSuccess().getChangedAmount()) + .or(() -> Optional.ofNullable(changedAmount)) .orElse(null); } diff --git a/src/main/java/dev/vality/disputes/schedule/service/CreateAdjustmentsService.java b/src/main/java/dev/vality/disputes/schedule/service/CreateAdjustmentsService.java new file mode 100644 index 0000000..910f534 --- /dev/null +++ b/src/main/java/dev/vality/disputes/schedule/service/CreateAdjustmentsService.java @@ -0,0 +1,95 @@ +package dev.vality.disputes.schedule.service; + +import dev.vality.damsel.domain.InvoicePaymentAdjustment; +import dev.vality.damsel.payment_processing.InvoicePayment; +import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentParams; +import dev.vality.disputes.constant.ErrorReason; +import dev.vality.disputes.dao.DisputeDao; +import dev.vality.disputes.domain.enums.DisputeStatus; +import dev.vality.disputes.domain.tables.pojos.Dispute; +import dev.vality.disputes.exception.InvoicingPaymentStatusPendingException; +import dev.vality.disputes.schedule.converter.InvoicePaymentAdjustmentParamsConverter; +import dev.vality.disputes.service.external.InvoicingService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Slf4j +@Service +@RequiredArgsConstructor +@SuppressWarnings({"ParameterName", "LineLength", "MissingSwitchDefault"}) +public class CreateAdjustmentsService { + + private final DisputeDao disputeDao; + private final InvoicingService invoicingService; + private final InvoicePaymentAdjustmentParamsConverter invoicePaymentAdjustmentParamsConverter; + private final AdjustmentExtractor adjustmentExtractor; + + @Transactional(propagation = Propagation.REQUIRED) + public List getDisputesForHgCall(int batchSize) { + log.debug("Trying to getDisputesForHgCall"); + var locked = disputeDao.getDisputesForHgCall(batchSize); + log.debug("getDisputesForHgCall has been found, size={}", locked.size()); + return locked; + } + + @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ) + public void callHgForCreateAdjustment(Dispute dispute) { + log.debug("Trying to getDisputeForUpdateSkipLocked {}", dispute); + var forUpdate = disputeDao.getDisputeForUpdateSkipLocked(dispute.getId()); + if (forUpdate == null || forUpdate.getStatus() != DisputeStatus.create_adjustment) { + log.debug("Dispute locked or wrong status {}", forUpdate); + return; + } + log.debug("GetDisputeForUpdateSkipLocked has been found {}", dispute); + var invoicePayment = getInvoicePayment(dispute); + if (invoicePayment == null || !invoicePayment.isSetRoute()) { + log.error("Trying to set failed Dispute status with PAYMENT_NOT_FOUND error reason {}", dispute); + disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.PAYMENT_NOT_FOUND); + log.debug("Dispute status has been set to failed {}", dispute); + return; + } + var invoicePaymentAdjustment = adjustmentExtractor.searchAdjustmentByDispute(invoicePayment, dispute); + if (invoicePaymentAdjustment.isPresent()) { + var changedAmount = adjustmentExtractor.getChangedAmount(invoicePaymentAdjustment.get(), dispute.getChangedAmount()); + log.info("Trying to set succeeded Dispute status {}", dispute); + disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount); + log.debug("Dispute status has been set to succeeded {}", dispute); + return; + } + try { + var params = invoicePaymentAdjustmentParamsConverter.convert(dispute); + var paymentAdjustment = createAdjustment(dispute, params); + if (paymentAdjustment == null) { + log.error("Trying to set failed Dispute status with INVOICE_NOT_FOUND error reason {}", dispute); + disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.INVOICE_NOT_FOUND); + log.debug("Dispute status has been set to failed {}", dispute); + return; + } + } catch (InvoicingPaymentStatusPendingException e) { + // в теории 0%, что сюда попдает выполнение кода, но если попадет, то: + // платеж с не финальным статусом будет заблочен для создания корректировок на стороне хелгейта + // и тогда диспут будет пулиться, пока платеж не зафиналится, + // и тк никакой записи в коде выше нет, то пуллинг не проблема + // а запрос в checkDisputeStatus по идемпотентности просто вернет тот же success + log.error("Error when hg.createPaymentAdjustment() got payments status pending {}", dispute, e); + return; + } + log.info("Trying to set succeeded Dispute status {}", dispute); + disputeDao.update(dispute.getId(), DisputeStatus.succeeded); + log.debug("Dispute status has been set to succeeded {}", dispute); + } + + private InvoicePaymentAdjustment createAdjustment(Dispute dispute, InvoicePaymentAdjustmentParams params) { + return invoicingService.createPaymentAdjustment(dispute.getInvoiceId(), dispute.getPaymentId(), params); + } + + private InvoicePayment getInvoicePayment(Dispute dispute) { + return invoicingService.getInvoicePayment(dispute.getInvoiceId(), dispute.getPaymentId()); + } +} diff --git a/src/main/java/dev/vality/disputes/schedule/service/CreatedDisputesService.java b/src/main/java/dev/vality/disputes/schedule/service/CreatedDisputesService.java index 05b8c99..cb3bf95 100644 --- a/src/main/java/dev/vality/disputes/schedule/service/CreatedDisputesService.java +++ b/src/main/java/dev/vality/disputes/schedule/service/CreatedDisputesService.java @@ -113,7 +113,7 @@ public class CreatedDisputesService { case SUCCESS_RESULT -> { var nextCheckAfter = exponentialBackOffPollingService.prepareNextPollingInterval(dispute); log.info("Trying to set pending Dispute status {}, {}", dispute, result); - providerDisputeDao.save(new ProviderDispute(result.getSuccessResult().getDisputeId(), dispute.getId())); + providerDisputeDao.save(new ProviderDispute(result.getSuccessResult().getProviderDisputeId(), dispute.getId())); disputeDao.update(dispute.getId(), DisputeStatus.pending, nextCheckAfter); log.debug("Dispute status has been set to pending {}", dispute); } @@ -130,7 +130,7 @@ public class CreatedDisputesService { void finishTaskWithManualParsingFlowActivation(Dispute dispute, List attachments) { manualParsingTopic.sendCreated(dispute, attachments); log.info("Trying to set manual_parsing_created Dispute status {}", dispute); - disputeDao.update(dispute.getId(), DisputeStatus.manual_parsing_created); + disputeDao.update(dispute.getId(), DisputeStatus.manual_created); log.debug("Dispute status has been set to manual_parsing_created {}", dispute); } diff --git a/src/main/java/dev/vality/disputes/schedule/service/PendingDisputesService.java b/src/main/java/dev/vality/disputes/schedule/service/PendingDisputesService.java index f0250a4..7d96c0c 100644 --- a/src/main/java/dev/vality/disputes/schedule/service/PendingDisputesService.java +++ b/src/main/java/dev/vality/disputes/schedule/service/PendingDisputesService.java @@ -1,27 +1,16 @@ package dev.vality.disputes.schedule.service; -import dev.vality.damsel.domain.InvoicePaymentAdjustment; -import dev.vality.damsel.domain.Terminal; -import dev.vality.damsel.domain.TerminalRef; -import dev.vality.damsel.payment_processing.InvoicePayment; -import dev.vality.damsel.payment_processing.InvoicePaymentAdjustmentParams; import dev.vality.disputes.DisputeStatusResult; import dev.vality.disputes.constant.ErrorReason; import dev.vality.disputes.dao.DisputeDao; import dev.vality.disputes.dao.ProviderDisputeDao; import dev.vality.disputes.domain.enums.DisputeStatus; import dev.vality.disputes.domain.tables.pojos.Dispute; -import dev.vality.disputes.exception.InvoicingPaymentStatusPendingException; -import dev.vality.disputes.manualparsing.ManualParsingTopic; import dev.vality.disputes.polling.ExponentialBackOffPollingServiceWrapper; import dev.vality.disputes.polling.PollingInfoService; import dev.vality.disputes.schedule.client.RemoteClient; -import dev.vality.disputes.schedule.converter.InvoicePaymentAdjustmentParamsConverter; -import dev.vality.disputes.service.external.DominantService; -import dev.vality.disputes.service.external.InvoicingService; import dev.vality.geck.serializer.kit.tbase.TErrorUtil; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; @@ -29,9 +18,6 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static dev.vality.disputes.constant.TerminalOptionsField.DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT; @Slf4j @Service @@ -42,13 +28,8 @@ public class PendingDisputesService { private final RemoteClient remoteClient; private final DisputeDao disputeDao; private final ProviderDisputeDao providerDisputeDao; - private final InvoicingService invoicingService; - private final DominantService dominantService; private final PollingInfoService pollingInfoService; - private final InvoicePaymentAdjustmentParamsConverter invoicePaymentAdjustmentParamsConverter; - private final AdjustmentExtractor adjustmentExtractor; private final ExponentialBackOffPollingServiceWrapper exponentialBackOffPollingService; - private final ManualParsingTopic manualParsingTopic; @Transactional(propagation = Propagation.REQUIRED) public List getPendingDisputesForUpdateSkipLocked(int batchSize) { @@ -92,48 +73,10 @@ public class PendingDisputesService { void finishTask(Dispute dispute, DisputeStatusResult result) { switch (result.getSetField()) { case STATUS_SUCCESS -> { - var invoicePayment = getInvoicePayment(dispute); - if (invoicePayment == null || !invoicePayment.isSetRoute()) { - log.error("Trying to set failed Dispute status with PAYMENT_NOT_FOUND error reason {}", dispute); - disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.PAYMENT_NOT_FOUND); - log.debug("Dispute status has been set to failed {}", dispute); - return; - } - var invoicePaymentAdjustment = adjustmentExtractor.searchAdjustmentByDispute(invoicePayment, dispute); - if (invoicePaymentAdjustment.isPresent()) { - var changedAmount = adjustmentExtractor.getChangedAmount(invoicePaymentAdjustment.get(), result); - log.info("Trying to set succeeded Dispute status {}, {}", dispute, result); - disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount); - log.debug("Dispute status has been set to succeeded {}", dispute); - return; - } var changedAmount = result.getStatusSuccess().getChangedAmount().orElse(null); - if (!isHgSkipCreateAdjustment(dispute)) { - try { - var params = invoicePaymentAdjustmentParamsConverter.convert(dispute, result); - var paymentAdjustment = createAdjustment(dispute, params); - if (paymentAdjustment == null) { - log.error("Trying to set failed Dispute status with INVOICE_NOT_FOUND error reason {}", dispute); - disputeDao.update(dispute.getId(), DisputeStatus.failed, ErrorReason.INVOICE_NOT_FOUND); - log.debug("Dispute status has been set to failed {}", dispute); - return; - } - } catch (InvoicingPaymentStatusPendingException e) { - // в теории 0%, что сюда попдает выполнение кода, но если попадет, то: - // платеж с не финальным статусом будет заблочен для создания корректировок на стороне хелгейта - // и тогда диспут будет пулиться, пока платеж не зафиналится, - // и тк никакой записи в коде выше нет, то пуллинг не проблема - // а запрос в checkDisputeStatus по идемпотентности просто вернет тот же success - log.error("Error when hg.createPaymentAdjustment() {}", dispute, e); - return; - } - } - log.info("Trying to set succeeded Dispute status {}, {}", dispute, result); - disputeDao.update(dispute.getId(), DisputeStatus.succeeded, changedAmount); - log.debug("Dispute status has been set to succeeded {}", dispute); - if (isHgSkipCreateAdjustment(dispute)) { - manualParsingTopic.sendSucceeded(dispute, changedAmount); - } + log.info("Trying to set create_adjustment Dispute status {}, {}", dispute, result); + disputeDao.update(dispute.getId(), DisputeStatus.create_adjustment, changedAmount); + log.debug("Dispute status has been set to create_adjustment {}", dispute); } case STATUS_FAIL -> { var errorMessage = TErrorUtil.toStringVal(result.getStatusFail().getFailure()); @@ -152,22 +95,4 @@ public class PendingDisputesService { } } } - - @SneakyThrows - private boolean isHgSkipCreateAdjustment(Dispute dispute) { - return getTerminal(dispute.getTerminalId()).get().getOptions() - .containsKey(DISPUTE_FLOW_HG_SKIP_CREATE_ADJUSTMENT); - } - - private CompletableFuture getTerminal(Integer terminalId) { - return dominantService.getTerminal(new TerminalRef(terminalId)); - } - - private InvoicePaymentAdjustment createAdjustment(Dispute dispute, InvoicePaymentAdjustmentParams params) { - return invoicingService.createPaymentAdjustment(dispute.getInvoiceId(), dispute.getPaymentId(), params); - } - - private InvoicePayment getInvoicePayment(Dispute dispute) { - return invoicingService.getInvoicePayment(dispute.getInvoiceId(), dispute.getPaymentId()); - } } diff --git a/src/main/java/dev/vality/disputes/servlet/ManualParsingServlet.java b/src/main/java/dev/vality/disputes/servlet/ManualParsingServlet.java index ab6e794..13a4683 100644 --- a/src/main/java/dev/vality/disputes/servlet/ManualParsingServlet.java +++ b/src/main/java/dev/vality/disputes/servlet/ManualParsingServlet.java @@ -1,6 +1,6 @@ package dev.vality.disputes.servlet; -import dev.vality.disputes.ManualParsingServiceSrv; +import dev.vality.disputes.admin.ManualParsingServiceSrv; import dev.vality.woody.thrift.impl.http.THServiceBuilder; import jakarta.servlet.*; import jakarta.servlet.annotation.WebServlet; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 49ef459..5cd9555 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -93,8 +93,10 @@ dispute: batchSize: 1 fixedDelayCreated: 5000 fixedDelayPending: 5000 + fixedDelayCreateAdjustments: 5000 isScheduleCreatedEnabled: true isSchedulePendingEnabled: true + isScheduleCreateAdjustmentsEnabled: true time: config: diff --git a/src/main/resources/db/migration/V2__add_cancelled_type.sql b/src/main/resources/db/migration/V2__add_cancelled_type.sql deleted file mode 100644 index 43e36ba..0000000 --- a/src/main/resources/db/migration/V2__add_cancelled_type.sql +++ /dev/null @@ -1,3 +0,0 @@ -alter type dspt.dispute_status add value 'cancelled'; -alter type dspt.dispute_status add value 'manual_parsing_created'; -alter type dspt.dispute_status add value 'manual_parsing_binded_pending'; diff --git a/src/main/resources/db/migration/V2__add_dispute_statuses.sql b/src/main/resources/db/migration/V2__add_dispute_statuses.sql new file mode 100644 index 0000000..0cce795 --- /dev/null +++ b/src/main/resources/db/migration/V2__add_dispute_statuses.sql @@ -0,0 +1,4 @@ +ALTER TYPE dspt.dispute_status ADD VALUE 'cancelled'; +ALTER TYPE dspt.dispute_status ADD VALUE 'manual_created'; +ALTER TYPE dspt.dispute_status ADD VALUE 'manual_pending'; +ALTER TYPE dspt.dispute_status ADD VALUE 'create_adjustment'; diff --git a/src/main/resources/db/migration/V3__add_skip_hg_flag.sql b/src/main/resources/db/migration/V3__add_skip_hg_flag.sql new file mode 100644 index 0000000..0547583 --- /dev/null +++ b/src/main/resources/db/migration/V3__add_skip_hg_flag.sql @@ -0,0 +1,2 @@ +ALTER TABLE dspt.dispute + ADD COLUMN "skip_call_hg_for_create_adjustment" BOOLEAN NOT NULL DEFAULT TRUE;