FF-226: Withdrawal session finish notification (1 part) (#337)

This commit is contained in:
Alexey 2020-11-16 12:36:00 +03:00 committed by GitHub
parent 01fb6d846d
commit f7c9e65c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 332 additions and 77 deletions

View File

@ -4,6 +4,10 @@
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("damsel/include/dmsl_withdrawals_provider_adapter_thrift.hrl").
%% Accessors
-export([id/1]).
%% API
-export([process_withdrawal/4]).
@ -113,6 +117,15 @@
-export_type([quote_data/0]).
-export_type([identity/0]).
%%
%% Accessors
%%
-spec id(withdrawal()) ->
binary().
id(Withdrawal) ->
maps:get(id, Withdrawal).
%%
%% API
%%

View File

@ -176,7 +176,7 @@
-type invalid_withdrawal_status_error() ::
{invalid_withdrawal_status, status()}.
-type action() :: poll | continue | undefined.
-type action() :: sleep | continue | undefined.
-export_type([withdrawal/0]).
-export_type([withdrawal_state/0]).
@ -199,6 +199,10 @@
-export([process_transfer/1]).
%%
-export([process_session_finished/3]).
%% Accessors
-export([wallet_id/1]).
@ -295,7 +299,7 @@
p_transfer_start |
p_transfer_prepare |
session_starting |
session_polling |
session_sleeping |
p_transfer_commit |
p_transfer_cancel |
limit_check |
@ -507,6 +511,51 @@ process_transfer(Withdrawal) ->
Activity = deduce_activity(Withdrawal),
do_process_transfer(Activity, Withdrawal).
%%
-spec process_session_finished(session_id(), session_result(), withdrawal_state()) ->
{ok, process_result()} | {error, session_not_found | old_session | result_mismatch}.
process_session_finished(SessionID, SessionResult, Withdrawal) ->
case get_session_by_id(SessionID, Withdrawal) of
#{id := SessionID, result := SessionResult} ->
{ok, {undefined, []}};
#{id := SessionID, result := _OtherSessionResult} ->
{error, result_mismatch};
#{id := SessionID} ->
try_finish_session(SessionID, SessionResult, Withdrawal);
undefined ->
{error, session_not_found}
end.
-spec get_session_by_id(session_id(), withdrawal_state()) ->
session() | undefined.
get_session_by_id(SessionID, Withdrawal) ->
Sessions = ff_withdrawal_route_attempt_utils:get_sessions(attempts(Withdrawal)),
case lists:filter(fun(#{id := SessionID0}) -> SessionID0 =:= SessionID end, Sessions) of
[Session] -> Session;
[] -> undefined
end.
-spec try_finish_session(session_id(), session_result(), withdrawal_state()) ->
{ok, process_result()} | {error, old_session}.
try_finish_session(SessionID, SessionResult, Withdrawal) ->
case is_current_session(SessionID, Withdrawal) of
true ->
{ok, {continue, [{session_finished, {SessionID, SessionResult}}]}};
false ->
{error, old_session}
end.
-spec is_current_session(session_id(), withdrawal_state()) ->
boolean().
is_current_session(SessionID, Withdrawal) ->
case session_id(Withdrawal) of
SessionID ->
true;
_ ->
false
end.
%% Internals
-spec do_start_adjustment(adjustment_params(), withdrawal_state()) ->
@ -639,7 +688,7 @@ do_pending_activity(#{p_transfer := prepared, limit_check := failed}) ->
do_pending_activity(#{p_transfer := cancelled, limit_check := failed}) ->
{fail, limit_check};
do_pending_activity(#{p_transfer := prepared, session := pending}) ->
session_polling;
session_sleeping;
do_pending_activity(#{p_transfer := prepared, session := succeeded}) ->
p_transfer_commit;
do_pending_activity(#{p_transfer := committed, session := succeeded}) ->
@ -683,8 +732,8 @@ do_process_transfer(limit_check, Withdrawal) ->
process_limit_check(Withdrawal);
do_process_transfer(session_starting, Withdrawal) ->
process_session_creation(Withdrawal);
do_process_transfer(session_polling, Withdrawal) ->
process_session_poll(Withdrawal);
do_process_transfer(session_sleeping, Withdrawal) ->
process_session_sleep(Withdrawal);
do_process_transfer({fail, Reason}, Withdrawal) ->
{ok, Providers} = do_process_routing(Withdrawal),
process_route_change(Providers, Withdrawal, Reason);
@ -869,15 +918,15 @@ create_session(ID, TransferData, SessionParams) ->
ok
end.
-spec process_session_poll(withdrawal_state()) ->
-spec process_session_sleep(withdrawal_state()) ->
process_result().
process_session_poll(Withdrawal) ->
process_session_sleep(Withdrawal) ->
SessionID = session_id(Withdrawal),
{ok, SessionMachine} = ff_withdrawal_session_machine:get(SessionID),
Session = ff_withdrawal_session_machine:session(SessionMachine),
case ff_withdrawal_session:status(Session) of
active ->
{poll, []};
{sleep, []};
{finished, _} ->
Result = ff_withdrawal_session:result(Session),
{continue, [{session_finished, {SessionID, Result}}]}

View File

@ -55,6 +55,7 @@
-export([repair/2]).
-export([start_adjustment/2]).
-export([notify_session_finished/3]).
%% Accessors
@ -78,8 +79,12 @@
-type adjustment_params() :: ff_withdrawal:adjustment_params().
-type session_id() :: ff_withdrawal_session:id().
-type session_result() :: ff_withdrawal_session:session_result().
-type call() ::
{start_adjustment, adjustment_params()}.
{start_adjustment, adjustment_params()} |
{session_finished, session_id(), session_result()}.
-define(NS, 'ff/withdrawal_v2').
@ -139,6 +144,11 @@ repair(ID, Scenario) ->
start_adjustment(WithdrawalID, Params) ->
call(WithdrawalID, {start_adjustment, Params}).
-spec notify_session_finished(id(), session_id(), session_result()) ->
ok | {error, session_not_found | old_session | result_mismatch}.
notify_session_finished(WithdrawalID, SessionID, SessionResult) ->
call(WithdrawalID, {session_finished, SessionID, SessionResult}).
%% Accessors
-spec withdrawal(st()) ->
@ -160,8 +170,6 @@ ctx(St) ->
-type handler_opts() :: machinery:handler_opts(_).
-type handler_args() :: machinery:handler_args(_).
-define(MAX_SESSION_POLL_TIMEOUT, 4 * 60 * 60).
backend() ->
fistful:backend(?NS).
@ -188,6 +196,8 @@ process_timeout(Machine, _, _Opts) ->
process_call({start_adjustment, Params}, Machine, _, _Opts) ->
do_start_adjustment(Params, Machine);
process_call({session_finished, SessionID, SessionResult}, Machine, _, _Opts) ->
do_process_session_finished(SessionID, SessionResult, Machine);
process_call(CallArgs, _Machine, _, _Opts) ->
erlang:error({unexpected_call, CallArgs}).
@ -209,6 +219,17 @@ do_start_adjustment(Params, Machine) ->
{Error, #{}}
end.
-spec do_process_session_finished(session_id(), session_result(), machine()) -> {Response, result()} when
Response :: ok | {error, session_not_found | old_session | result_mismatch}.
do_process_session_finished(SessionID, SessionResult, Machine) ->
St = ff_machine:collapse(ff_withdrawal, Machine),
case ff_withdrawal:process_session_finished(SessionID, SessionResult, withdrawal(St)) of
{ok, Result} ->
{ok, process_result(Result, St)};
{error, _Reason} = Error ->
{Error, #{}}
end.
process_result({Action, Events}, St) ->
genlib_map:compact(#{
events => set_events(Events),
@ -224,10 +245,12 @@ set_action(continue, _St) ->
continue;
set_action(undefined, _St) ->
undefined;
set_action(poll, St) ->
set_action(sleep, St) ->
% @TODO remove polling from here after deployment of FF-226 (part 2) and replace with unset_timer
Now = machinery_time:now(),
{set_timer, {timeout, compute_poll_timeout(Now, St)}}.
-define(MAX_SESSION_POLL_TIMEOUT, 4 * 60 * 60).
compute_poll_timeout(Now, St) ->
MaxTimeout = genlib_app:env(ff_transfer, max_session_poll_timeout, ?MAX_SESSION_POLL_TIMEOUT),
Timeout0 = machinery_time:interval(Now, ff_machine:updated(St)) div 1000,

View File

@ -102,6 +102,18 @@
opts := ff_withdrawal_provider:adapter_opts()
}.
-type id() :: machinery:id().
-type action() ::
undefined |
continue |
{setup_callback, machinery:tag(), machinery:timer()} |
{setup_timer, machinery:timer()} |
retry |
finish.
-type process_result() :: {action(), [event()]}.
-export_type([id/0]).
-export_type([data/0]).
-export_type([event/0]).
@ -114,15 +126,12 @@
-export_type([callback_params/0]).
-export_type([process_callback_response/0]).
-export_type([process_callback_error/0]).
-export_type([process_result/0]).
-export_type([action/0]).
%%
%% Internal types
%%
-type id() :: machinery:id().
-type auxst() :: undefined.
-type result() :: machinery:result(event(), auxst()).
-type withdrawal() :: ff_adapter_withdrawal:withdrawal().
-type callbacks_index() :: ff_withdrawal_callback_utils:index().
-type adapter_with_opts() :: {ff_withdrawal_provider:adapter(), ff_withdrawal_provider:adapter_opts()}.
@ -215,7 +224,20 @@ apply_event({callback, _Ev} = WrappedEvent, Session) ->
Callbacks1 = ff_withdrawal_callback_utils:apply_event(WrappedEvent, Callbacks0),
set_callbacks_index(Callbacks1, Session).
-spec process_session(session_state()) -> result().
-spec process_session(session_state()) -> process_result().
process_session(#{status := {finished, _}, id := _ID, result := _Result, withdrawal := _Withdrawal}) ->
{finish, []}; % @TODO remove after deployment of FF-226 (part 1)
% @TODO uncomment after deployment of FF-226 (part 1)
% Session has finished, it should notify the withdrawal machine about the fact
%WithdrawalID = ff_adapter_withdrawal:id(Withdrawal),
%case ff_withdrawal_machine:notify_session_finished(WithdrawalID, ID, Result) of
% ok ->
% {finish, []};
% {error, session_not_found} ->
% {retry, []};
% {error, _} = Error ->
% erlang:error({unable_to_finish_session, Error})
%end;
process_session(#{status := active, withdrawal := Withdrawal, route := Route} = SessionState) ->
{Adapter, AdapterOpts} = get_adapter_with_opts(Route),
ASt = maps:get(adapter_state, SessionState, undefined),
@ -223,7 +245,7 @@ process_session(#{status := active, withdrawal := Withdrawal, route := Route} =
#{intent := Intent} = ProcessResult,
Events0 = process_next_state(ProcessResult, []),
Events1 = process_transaction_info(ProcessResult, Events0, SessionState),
process_intent(Intent, SessionState, Events1).
process_adapter_intent(Intent, SessionState, Events1).
process_transaction_info(#{transaction_info := TrxInfo}, Events, SessionState) ->
ok = assert_transaction_info(TrxInfo, transaction_info(SessionState)),
@ -241,27 +263,24 @@ assert_transaction_info(NewTrxInfo, _TrxInfo) ->
erlang:error({transaction_info_is_different, NewTrxInfo}).
-spec set_session_result(session_result(), session_state()) ->
result().
set_session_result(Result, #{status := active}) ->
#{
events => [{finished, Result}],
action => unset_timer
}.
process_result().
set_session_result(Result, Session = #{status := active}) ->
process_adapter_intent({finish, Result}, Session).
-spec process_callback(callback_params(), session_state()) ->
{ok, {process_callback_response(), result()}} |
{error, {process_callback_error(), result()}}.
{ok, {process_callback_response(), process_result()}} |
{error, {process_callback_error(), process_result()}}.
process_callback(#{tag := CallbackTag} = Params, Session) ->
{ok, Callback} = find_callback(CallbackTag, Session),
case ff_withdrawal_callback:status(Callback) of
succeeded ->
{ok, {ff_withdrawal_callback:response(Callback), #{}}};
{ok, {ff_withdrawal_callback:response(Callback), {undefined, []}}};
pending ->
case status(Session) of
active ->
do_process_callback(Params, Callback, Session);
{finished, _} ->
{error, {{session_already_finished, make_session_finish_params(Session)}, #{}}}
{error, {{session_already_finished, make_session_finish_params(Session)}, {undefined, []}}}
end
end.
@ -283,7 +302,7 @@ do_process_callback(CallbackParams, Callback, Session) ->
Events0 = ff_withdrawal_callback_utils:process_response(Response, Callback),
Events1 = process_next_state(HandleCallbackResult, Events0),
Events2 = process_transaction_info(HandleCallbackResult, Events1, Session),
{ok, {Response, process_intent(Intent, Session, Events2)}}.
{ok, {Response, process_adapter_intent(Intent, Session, Events2)}}.
make_session_finish_params(Session) ->
{_Adapter, AdapterOpts} = get_adapter_with_opts(route(Session)),
@ -298,28 +317,21 @@ process_next_state(#{next_state := NextState}, Events) ->
process_next_state(_, Events) ->
Events.
process_intent(Intent, Session, Events) ->
#{events := Events0} = Result = process_intent(Intent, Session),
Result#{events => Events ++ Events0}.
process_adapter_intent(Intent, Session, Events0) ->
{Action, Events1} = process_adapter_intent(Intent, Session),
{Action, Events0 ++ Events1}.
process_intent({finish, {success, _TransactionInfo}}, _Session) ->
process_adapter_intent({finish, {success, _TransactionInfo}}, _Session) ->
%% we ignore TransactionInfo here
%% @see ff_adapter_withdrawal:rebind_transaction_info/1
#{
events => [{finished, success}],
action => unset_timer
};
process_intent({finish, Result}, _Session) ->
#{
events => [{finished, Result}],
action => unset_timer
};
process_intent({sleep, #{timer := Timer} = Params}, Session) ->
CallbackEvents = create_callback(Params, Session),
#{
events => CallbackEvents,
action => maybe_add_tag_action(Params, [timer_action(Timer)])
}.
{continue, [{finished, success}]};
process_adapter_intent({finish, Result}, _Session) ->
{continue, [{finished, Result}]};
process_adapter_intent({sleep, #{timer := Timer, tag := Tag}}, Session) ->
Events = create_callback(Tag, Session),
{{setup_callback, Tag, Timer}, Events};
process_adapter_intent({sleep, #{timer := Timer}}, _Session) ->
{{setup_timer, Timer}, []}.
%%
@ -334,16 +346,14 @@ create_session(ID, Data, #{withdrawal_id := WdthID, resource := Res, route := Ro
status => active
}.
create_callback(#{tag := Tag}, Session) ->
create_callback(Tag, Session) ->
case ff_withdrawal_callback_utils:get_by_tag(Tag, callbacks_index(Session)) of
{error, {unknown_callback, Tag}} ->
{ok, CallbackEvents} = ff_withdrawal_callback:create(#{tag => Tag}),
ff_withdrawal_callback_utils:wrap_events(Tag, CallbackEvents);
{ok, Callback} ->
erlang:error({callback_already_exists, Callback})
end;
create_callback(_, _) ->
[].
end.
-spec convert_identity_state_to_adapter_identity(ff_identity:identity_state()) ->
ff_adapter_withdrawal:identity().
@ -401,13 +411,3 @@ create_adapter_withdrawal(#{id := SesID, sender := Sender, receiver := Receiver}
-spec set_callbacks_index(callbacks_index(), session_state()) -> session_state().
set_callbacks_index(Callbacks, Session) ->
Session#{callbacks => Callbacks}.
-spec timer_action({deadline, binary()} | {timeout, non_neg_integer()}) -> machinery:action().
timer_action(Timer) ->
{set_timer, Timer}.
-spec maybe_add_tag_action(SleepIntentParams :: map(), [machinery:action()]) -> [machinery:action()].
maybe_add_tag_action(#{tag := Tag}, Actions) ->
[{tag, Tag} | Actions];
maybe_add_tag_action(_, Actions) ->
Actions.

View File

@ -58,6 +58,7 @@
-type st() :: ff_machine:st(session()).
-type session() :: ff_withdrawal_session:session_state().
-type event() :: ff_withdrawal_session:event().
-type action() :: ff_withdrawal_session:action().
-type event_range() :: {After :: non_neg_integer() | undefined, Limit :: non_neg_integer() | undefined}.
-type callback_params() :: ff_withdrawal_session:callback_params().
@ -66,6 +67,8 @@
{unknown_session, {tag, id()}} |
ff_withdrawal_session:process_callback_error().
-type process_result() :: ff_withdrawal_session:process_result().
-type ctx() :: ff_entity_context:context().
%% Pipeline
@ -76,6 +79,9 @@
%% API
%%
-define(SESSION_RETRY_TIME_LIMIT, 24 * 60 * 60).
-define(MAX_SESSION_RETRY_TIMEOUT, 4 * 60 * 60).
-spec session(st()) -> session().
session(St) ->
@ -147,14 +153,11 @@ init(Events, #{}, _, _Opts) ->
result().
process_timeout(Machine, _, _Opts) ->
State = ff_machine:collapse(ff_withdrawal_session, Machine),
#{events := Events} = Result = ff_withdrawal_session:process_session(session(State)),
Result#{
events => ff_machine:emit_events(Events)
}.
Session = session(State),
process_result(ff_withdrawal_session:process_session(Session), State).
-spec process_call(any(), machine(), handler_args(), handler_opts()) ->
{ok, result()}.
process_call({process_callback, Params}, Machine, _, _Opts) ->
do_process_callback(Params, Machine);
process_call(_CallArgs, #{}, _, _Opts) ->
@ -166,7 +169,8 @@ process_repair(Scenario, Machine, _Args, _Opts) ->
ScenarioProcessors = #{
set_session_result => fun(Args, RMachine) ->
State = ff_machine:collapse(ff_withdrawal_session, RMachine),
{ok, {ok, ff_withdrawal_session:set_session_result(Args, session(State))}}
{Action, Events} = ff_withdrawal_session:set_session_result(Args, session(State)),
{ok, {ok, #{action => set_action(Action, State), events => Events}}}
end
},
ff_repair:apply_scenario(ff_withdrawal_session, Machine, Scenario, ScenarioProcessors).
@ -175,6 +179,104 @@ process_repair(Scenario, Machine, _Args, _Opts) ->
%% Internals
%%
-spec process_result(process_result(), st()) ->
result().
process_result({Action, Events}, St) ->
genlib_map:compact(#{
events => set_events(Events),
action => set_action(Action, St)
}).
-spec set_events([event()]) ->
undefined | ff_machine:timestamped_event(event()).
set_events([]) ->
undefined;
set_events(Events) ->
ff_machine:emit_events(Events).
-spec set_action(action(), st()) ->
undefined | machinery:action() | [machinery:action()].
set_action(continue, _St) ->
continue;
set_action(undefined, _St) ->
undefined;
set_action({setup_callback, Tag, Timer}, _St) ->
[tag_action(Tag), timer_action(Timer)];
set_action({setup_timer, Timer}, _St) ->
timer_action(Timer);
% @TODO uncomment after deployment of FF-226 (part 1)
%set_action(retry, St) ->
% case compute_retry_timer(St) of
% {ok, Timer} ->
% timer_action(Timer);
% {error, deadline_reached} = Error ->
% erlang:error(Error)
% end;
set_action(finish, _St) ->
unset_timer.
%%
% @TODO uncomment after deployment of FF-226 (part 1)
% -spec compute_retry_timer(st()) ->
% {ok, machinery:timer()} | {error, deadline_reached}.
% compute_retry_timer(St) ->
% Now = machinery_time:now(),
% Updated = ff_machine:updated(St),
% Deadline = compute_retry_deadline(Updated),
% Timeout = compute_next_timeout(Now, Updated),
% check_next_timeout(Timeout, Now, Deadline).
% -spec compute_retry_deadline(machinery:timestamp()) ->
% machinery:timestamp().
% compute_retry_deadline(Updated) ->
% RetryTimeLimit = genlib_app:env(ff_transfer, session_retry_time_limit, ?SESSION_RETRY_TIME_LIMIT),
% machinery_time:add_seconds(RetryTimeLimit, Updated).
% -spec compute_next_timeout(machinery:timestamp(), machinery:timestamp()) ->
% timeout().
% compute_next_timeout(Now, Updated) ->
% MaxTimeout = genlib_app:env(ff_transfer, max_session_retry_timeout, ?MAX_SESSION_RETRY_TIMEOUT),
% Timeout0 = machinery_time:interval(Now, Updated) div 1000,
% erlang:min(MaxTimeout, erlang:max(1, Timeout0)).
% -spec check_next_timeout(timeout(), machinery:timestamp(), machinery:timestamp()) ->
% {ok, machinery:timer()} | {error, deadline_reached}.
% check_next_timeout(Timeout, Now, Deadline) ->
% case check_deadline(machinery_time:add_seconds(Timeout, Now), Deadline) of
% ok ->
% {ok, {timeout, Timeout}};
% {error, _} = Error ->
% Error
% end.
% -spec check_deadline(machinery:timestamp(), machinery:timestamp()) ->
% ok | {error, deadline_reached}.
% check_deadline({Now, _}, {Deadline, _}) ->
% check_deadline_(
% calendar:datetime_to_gregorian_seconds(Now),
% calendar:datetime_to_gregorian_seconds(Deadline)
% ).
% -spec check_deadline_(integer(), integer()) ->
% ok | {error, deadline_reached}.
% check_deadline_(Now, Deadline) when Now < Deadline ->
% ok;
% check_deadline_(Now, Deadline) when Now >= Deadline ->
% {error, deadline_reached}.
%%
-spec timer_action(machinery:timer()) ->
machinery:action().
timer_action(Timer) ->
{set_timer, Timer}.
-spec tag_action(machinery:tag()) ->
machinery:action().
tag_action(Tag) ->
{tag, Tag}.
backend() ->
fistful:backend(?NS).
@ -192,12 +294,10 @@ call(Ref, Call) ->
{error, ff_withdrawal_session:process_callback_error()}.
do_process_callback(Params, Machine) ->
St = ff_machine:collapse(ff_withdrawal_session, Machine),
St= ff_machine:collapse(ff_withdrawal_session, Machine),
case ff_withdrawal_session:process_callback(Params, session(St)) of
{ok, {Response, #{events := Events} = Result}} ->
{{ok, Response}, Result#{events => ff_machine:emit_events(Events)}};
{ok, {Response, Result}} ->
{{ok, Response}, Result};
{error, {Reason, Result}} ->
{{error, Reason}, Result}
{{ok, Response}, process_result(Result, St)};
{error, {Reason, _Result}} ->
{{error, Reason}, #{}}
end.

View File

@ -2,6 +2,7 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("fistful_proto/include/ff_proto_withdrawal_session_thrift.hrl").
-include_lib("shumpune_proto/include/shumpune_shumpune_thrift.hrl").
%% Common test API
@ -17,6 +18,7 @@
%% Tests
-export([session_fail_test/1]).
-export([session_repair_test/1]).
-export([quote_fail_test/1]).
-export([route_not_found_fail_test/1]).
-export([provider_operations_forbidden_fail_test/1]).
@ -70,6 +72,7 @@ groups() ->
[
{default, [parallel], [
session_fail_test,
session_repair_test,
quote_fail_test,
route_not_found_fail_test,
provider_operations_forbidden_fail_test,
@ -582,6 +585,43 @@ provider_callback_test(C) ->
% Check that session is still alive
?assertEqual({ok, #{payload => CallbackPayload}}, call_process_callback(Callback)).
-spec session_repair_test(config()) -> test_return().
session_repair_test(C) ->
Currency = <<"RUB">>,
Cash = {700700, Currency},
#{
wallet_id := WalletID,
destination_id := DestinationID
} = prepare_standard_environment(Cash, C),
WithdrawalID = generate_id(),
WithdrawalParams = #{
id => WithdrawalID,
destination_id => DestinationID,
wallet_id => WalletID,
body => Cash,
quote => #{
cash_from => {700700, <<"RUB">>},
cash_to => {700700, <<"RUB">>},
created_at => <<"2016-03-22T06:12:27Z">>,
expires_on => <<"2016-03-22T06:12:27Z">>,
route => ff_withdrawal_routing:make_route(11, 1),
quote_data => #{<<"test">> => <<"fatal">>}
}
},
Callback = #{
tag => <<"cb_", WithdrawalID/binary>>,
payload => <<"super_secret">>
},
ok = ff_withdrawal_machine:create(WithdrawalParams, ff_entity_context:new()),
?assertEqual(pending, await_session_processing_status(WithdrawalID, pending)),
SessionID = get_session_id(WithdrawalID),
?assertEqual(<<"callback_processing">>, await_session_adapter_state(SessionID, <<"callback_processing">>)),
?assertError({failed, _, _}, call_process_callback(Callback)),
timer:sleep(3000),
?assertEqual(pending, await_session_processing_status(WithdrawalID, pending)),
ok = repair_withdrawal_session(WithdrawalID),
?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID)).
%% Utils
prepare_standard_environment(WithdrawalCash, C) ->
@ -795,3 +835,24 @@ make_dummy_party_change(PartyID) ->
call_process_callback(Callback) ->
ff_withdrawal_session_machine:process_callback(Callback).
repair_withdrawal_session(WithdrawalID) ->
SessionID = get_session_id(WithdrawalID),
{ok, ok} = call_session_repair(SessionID, {set_session_result, #wthd_session_SetResultRepair{
result = {success, #wthd_session_SessionResultSuccess{
trx_info = #'TransactionInfo'{
id = SessionID,
extra = #{}
}
}}
}}),
ok.
call_session_repair(SessionID, Scenario) ->
Service = {ff_proto_withdrawal_session_thrift, 'Repairer'},
Request = {Service, 'Repair', [SessionID, Scenario]},
Client = ff_woody_client:new(#{
url => <<"http://localhost:8022/v1/repair/withdrawal/session">>,
event_handler => scoper_woody_event_handler
}),
ff_woody_client:call(Client, Request).

View File

@ -129,9 +129,10 @@ validate_result(Mod, #{history := History} = Machine, #{events := NewEvents}) ->
{ok, valid}
catch
error:Error:Stack ->
logger:warning("Invalid repair result: ~p", [Error], #{
Stacktrace = genlib_format:format_stacktrace(Stack),
logger:warning("Invalid repair result: ~p, Stack: ~p", [Error, Stacktrace], #{
error => genlib:format(Error),
stacktrace => genlib_format:format_stacktrace(Stack)
stacktrace => Stacktrace
}),
{error, unexpected_failure}
end.

View File

@ -55,6 +55,10 @@
-type status() :: {success, transaction_info()} | {failure, failure()}.
-type timer() :: {deadline, binary()} | {timeout, integer()}.
%%
-define(DUMMY_QUOTE_ERROR_FATAL, {obj, #{{str, <<"test">>} => {str, <<"fatal">>}}}).
%%
%% API
%%
@ -102,6 +106,10 @@ get_quote(_Quote, _Options) ->
transaction_info => transaction_info()
}} when
CallbackTag :: binary().
handle_callback(_Callback, #{quote := #wthadpt_Quote{quote_data = QuoteData}}, _State, _Options) when
QuoteData =:= ?DUMMY_QUOTE_ERROR_FATAL
->
erlang:error(spanish_inquisition);
handle_callback(#{payload := Payload}, _Withdrawal, _State, _Options) ->
{ok, #{
intent => {finish, success},