TD-366: Switch to using notification mechanism for withdrawal sessions (#41)

This commit is contained in:
Alexey S 2022-08-26 14:05:05 +03:00 committed by GitHub
parent c95f74952a
commit 0b993f44a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 112 additions and 78 deletions

View File

@ -46,6 +46,7 @@ marshal(T, V, C) when
T =:= {args, init} orelse
T =:= {args, call} orelse
T =:= {args, repair} orelse
T =:= {args, notification} orelse
T =:= {aux_state, undefined} orelse
T =:= {response, call} orelse
T =:= {response, {repair, success}} orelse
@ -63,6 +64,7 @@ unmarshal(T, V, C) when
T =:= {args, init} orelse
T =:= {args, call} orelse
T =:= {args, repair} orelse
T =:= {args, notification} orelse
T =:= {response, call} orelse
T =:= {response, {repair, success}} orelse
T =:= {response, {repair, failure}}

View File

@ -77,6 +77,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -208,6 +209,10 @@ process_call(CallArgs, _Machine, _, _Opts) ->
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(ff_deposit, Machine, Scenario).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result() | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
%% Internals
backend() ->

View File

@ -48,6 +48,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -143,6 +144,10 @@ process_call(_CallArgs, #{}, _, _Opts) ->
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(ff_destination, Machine, Scenario).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result() | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
%% Internals
backend() ->

View File

@ -48,6 +48,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -143,6 +144,10 @@ process_call(_CallArgs, #{}, _, _Opts) ->
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(ff_source, Machine, Scenario).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result() | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
%% Internals
backend() ->

View File

@ -161,6 +161,9 @@
| {invalid_cash_flow_change, {already_has_domain_revision, domain_revision()}}
| ff_adjustment:create_error().
-type finalize_session_error() ::
{wrong_session_id, session_id()}.
-type unknown_adjustment_error() :: ff_adjustment_utils:unknown_adjustment_error().
-type invalid_status_change_error() ::
@ -221,6 +224,8 @@
-export([get_quote/1]).
-export([is_finished/1]).
-export([finalize_session/3]).
-export([start_adjustment/2]).
-export([find_adjustment/2]).
-export([adjustments/1]).
@ -964,19 +969,20 @@ create_session(ID, TransferData, SessionParams) ->
ok
end.
-spec process_session_sleep(withdrawal_state()) -> process_result().
process_session_sleep(Withdrawal) ->
SessionID = session_id(Withdrawal),
{ok, SessionMachine} = ff_withdrawal_session_machine:get(SessionID),
Session = ff_withdrawal_session_machine:session(SessionMachine),
case ff_withdrawal_session:status(Session) of
active ->
{sleep, []};
{finished, _} ->
Result = ff_withdrawal_session:result(Session),
{continue, [{session_finished, {SessionID, Result}}]}
-spec finalize_session(session_id(), session_result(), withdrawal_state()) ->
{ok, process_result()} | {error, finalize_session_error()}.
finalize_session(SessionID, Result, Withdrawal) ->
case session_id(Withdrawal) of
SessionID ->
{ok, {continue, [{session_finished, {SessionID, Result}}]}};
_OtherSessionID ->
{error, {wrong_session_id, SessionID}}
end.
-spec process_session_sleep(withdrawal_state()) -> process_result().
process_session_sleep(_Withdrawal) ->
{sleep, []}.
-spec process_transfer_finish(withdrawal_state()) -> process_result().
process_transfer_finish(_Withdrawal) ->
{undefined, [{status_changed, succeeded}]}.

View File

@ -32,6 +32,8 @@
-type repair_error() :: ff_repair:repair_error().
-type repair_response() :: ff_repair:repair_response().
-type notify_args() :: {session_finished, session_id(), session_result()}.
-export_type([id/0]).
-export_type([st/0]).
-export_type([action/0]).
@ -53,9 +55,9 @@
-export([get/2]).
-export([events/2]).
-export([repair/2]).
-export([notify/2]).
-export([start_adjustment/2]).
-export([notify_session_finished/3]).
%% Accessors
@ -68,6 +70,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -139,10 +142,10 @@ repair(ID, Scenario) ->
start_adjustment(WithdrawalID, Params) ->
call(WithdrawalID, {start_adjustment, Params}).
-spec notify_session_finished(id(), session_id(), session_result()) ->
ok | {error, session_not_found | old_session | result_mismatch}.
notify_session_finished(WithdrawalID, SessionID, SessionResult) ->
call(WithdrawalID, {session_finished, SessionID, SessionResult}).
-spec notify(id(), notify_args()) ->
ok | {error, notfound} | no_return().
notify(ID, Args) ->
machinery:notify(?NS, ID, Args, backend()).
%% Accessors
@ -191,6 +194,16 @@ process_call(CallArgs, _Machine, _, _Opts) ->
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(ff_withdrawal, Machine, Scenario).
-spec process_notification(notify_args(), machine(), handler_args(), handler_opts()) -> result() | no_return().
process_notification({session_finished, SessionID, SessionResult}, Machine, _HandlerArgs, _Opts) ->
St = ff_machine:collapse(ff_withdrawal, Machine),
case ff_withdrawal:finalize_session(SessionID, SessionResult, withdrawal(St)) of
{ok, Result} ->
process_result(Result, St);
{error, Reason} ->
erlang:error({unable_to_finalize_session, Reason})
end.
-spec do_start_adjustment(adjustment_params(), machine()) -> {Response, result()} when
Response :: ok | {error, ff_withdrawal:start_adjustment_error()}.
do_start_adjustment(Params, Machine) ->

View File

@ -212,11 +212,9 @@ apply_event({callback, _Ev} = WrappedEvent, Session) ->
process_session(#{status := {finished, _}, id := ID, result := Result, withdrawal := Withdrawal}) ->
% Session has finished, it should notify the withdrawal machine about the fact
WithdrawalID = ff_adapter_withdrawal:id(Withdrawal),
case ff_withdrawal_machine:notify_session_finished(WithdrawalID, ID, Result) of
case ff_withdrawal_machine:notify(WithdrawalID, {session_finished, ID, Result}) of
ok ->
{finish, []};
{error, session_not_found} ->
{retry, []};
{error, _} = Error ->
erlang:error({unable_to_finish_session, Error})
end;

View File

@ -32,6 +32,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%%
%% Types
@ -171,6 +172,10 @@ process_repair(Scenario, Machine, _Args, _Opts) ->
},
ff_repair:apply_scenario(ff_withdrawal_session, Machine, Scenario, ScenarioProcessors).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result() | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
%%
%% Internals
%%
@ -198,62 +203,11 @@ set_action({setup_callback, Tag, Timer}, St) ->
timer_action(Timer);
set_action({setup_timer, Timer}, _St) ->
timer_action(Timer);
set_action(retry, St) ->
case compute_retry_timer(St) of
{ok, Timer} ->
timer_action(Timer);
{error, deadline_reached} = Error ->
erlang:error(Error)
end;
set_action(finish, _St) ->
unset_timer.
%%
-spec compute_retry_timer(st()) -> {ok, machinery:timer()} | {error, deadline_reached}.
compute_retry_timer(St) ->
Now = machinery_time:now(),
Updated = ff_machine:updated(St),
Deadline = compute_retry_deadline(Updated),
Timeout = compute_next_timeout(Now, Updated),
check_next_timeout(Timeout, Now, Deadline).
-spec compute_retry_deadline(machinery:timestamp()) -> machinery:timestamp().
compute_retry_deadline(Updated) ->
RetryTimeLimit = genlib_app:env(ff_transfer, session_retry_time_limit, ?SESSION_RETRY_TIME_LIMIT),
machinery_time:add_seconds(RetryTimeLimit, Updated).
-spec compute_next_timeout(machinery:timestamp(), machinery:timestamp()) -> timeout().
compute_next_timeout(Now, Updated) ->
MaxTimeout = genlib_app:env(ff_transfer, max_session_retry_timeout, ?MAX_SESSION_RETRY_TIMEOUT),
Timeout0 = machinery_time:interval(Now, Updated) div 1000,
erlang:min(MaxTimeout, erlang:max(1, Timeout0)).
-spec check_next_timeout(timeout(), machinery:timestamp(), machinery:timestamp()) ->
{ok, machinery:timer()} | {error, deadline_reached}.
check_next_timeout(Timeout, Now, Deadline) ->
case check_deadline(machinery_time:add_seconds(Timeout, Now), Deadline) of
ok ->
{ok, {timeout, Timeout}};
{error, _} = Error ->
Error
end.
-spec check_deadline(machinery:timestamp(), machinery:timestamp()) -> ok | {error, deadline_reached}.
check_deadline({Now, _}, {Deadline, _}) ->
check_deadline_(
calendar:datetime_to_gregorian_seconds(Now),
calendar:datetime_to_gregorian_seconds(Deadline)
).
-spec check_deadline_(integer(), integer()) -> ok | {error, deadline_reached}.
check_deadline_(Now, Deadline) when Now < Deadline ->
ok;
check_deadline_(Now, Deadline) when Now >= Deadline ->
{error, deadline_reached}.
%%
-spec timer_action(machinery:timer()) -> machinery:action().
timer_action(Timer) ->
{set_timer, Timer}.

View File

@ -51,6 +51,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -142,4 +143,8 @@ process_call(_Call, _Machine, _Args, _Opts) ->
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(ff_identity, Machine, Scenario).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result() | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
%%

View File

@ -49,6 +49,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Types
@ -197,6 +198,10 @@ process_call({reject, Trx}, #{aux_state := St}, _, _Opts) ->
process_repair(_RepairArgs, _Machine, _Args, _Opts) ->
erlang:error({not_implemented, repair}).
-spec process_notification(_, machine(_), handler_args(), handler_opts()) -> result(_) | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
process_account(Trx, Range, St0) ->
case lookup_trx(get_trx_id(Trx), St0) of
error ->

View File

@ -68,6 +68,7 @@
-export([process_timeout/3]).
-export([process_call/4]).
-export([process_repair/4]).
-export([process_notification/4]).
%% Model callbacks
@ -235,3 +236,7 @@ process_repair(Args, Machine, Mod, _) ->
{error, _Reason} = Error ->
Error
end.
-spec process_notification(_, machine(_), _, _) -> result(_) | no_return().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.

View File

@ -44,6 +44,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -123,3 +124,7 @@ process_call(_CallArgs, #{}, _, _Opts) ->
{ok, {repair_response(), result()}} | {error, repair_error()}.
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(ff_wallet, Machine, Scenario).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.

View File

@ -21,11 +21,13 @@
-export([start/4]).
-export([call/5]).
-export([repair/5]).
-export([notify/5]).
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%%
@ -59,6 +61,10 @@ call(NS, ID, Range, Args, Backend) ->
repair(NS, ID, Range, Args, Backend) ->
machinery:repair(NS, ID, Range, Args, set_backend_context(Backend)).
-spec notify(namespace(), id(), range(), args(_), machinery:backend(_)) -> ok | {error, notfound}.
notify(NS, ID, Range, Args, Backend) ->
machinery:notify(NS, ID, Range, Args, set_backend_context(Backend)).
%%
-type handler_opts() :: _.
@ -104,6 +110,16 @@ process_repair(Args, Machine, Options, MachineryOptions) ->
ff_context:cleanup()
end.
-spec process_notification(args(_), machine(E, A), options(), handler_opts()) -> result(E, A).
process_notification(Args, Machine, Options, MachineryOptions) ->
#{handler := Handler} = Options,
ok = ff_context:save(create_context(Options, MachineryOptions)),
try
machinery:dispatch_signal({notification, Args}, Machine, machinery_utils:get_handler(Handler), #{})
after
ff_context:cleanup()
end.
%% Internals
-spec create_context(options(), handler_opts()) -> ff_context:context().

View File

@ -38,6 +38,7 @@
-export([call/5]).
-export([repair/5]).
-export([get/4]).
-export([notify/5]).
%% Gen Server
@ -124,6 +125,10 @@ report_notfound(NS, ID) ->
_ = _ = logger:debug("[machinery/gensrv][client][~s:~s] not found", [NS, ID]),
{error, notfound}.
-spec notify(namespace(), id(), range(), args(_), backend_opts()) -> no_return().
notify(_NS, _ID, _Range, _Args, _Opts) ->
erlang:error({not_implemented, notify}).
%% Gen Server + Supervisor
-spec start_machine_link(logic_handler(_), namespace(), id(), args(_)) -> {ok, pid()}.

View File

@ -65,6 +65,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Pipeline
@ -174,6 +175,10 @@ process_call(CallArgs, _Machine, _, _Opts) ->
process_repair(Scenario, Machine, _Args, _Opts) ->
ff_repair:apply_scenario(w2w_transfer, Machine, Scenario).
-spec process_notification(_, machine(), handler_args(), handler_opts()) -> result().
process_notification(_Args, _Machine, _HandlerArgs, _Opts) ->
#{}.
%% Internals
backend() ->

View File

@ -40,16 +40,16 @@ services:
retries: 10
machinegun:
image: docker.io/rbkmoney/machinegun:c05a8c18cd4f7966d70b6ad84cac9429cdfe37ae
image: ghcr.io/valitydev/machinegun:sha-00fe6d6
command: /opt/machinegun/bin/machinegun foreground
volumes:
- ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml
- ./test/machinegun/cookie:/opt/machinegun/etc/cookie
healthcheck:
test: "curl http://localhost:8022/"
interval: 5s
timeout: 1s
retries: 20
test: "/opt/machinegun/bin/machinegun ping"
interval: 10s
timeout: 5s
retries: 10
limiter:
image: ghcr.io/valitydev/limiter:sha-40e4b22

View File

@ -54,12 +54,12 @@
0},
{<<"machinery">>,
{git,"https://github.com/valitydev/machinery-erlang.git",
{ref,"62c32434c80a462956ad9d50f9bce47836580d77"}},
{ref,"dc899e245be64551eebe2e42c9cf473f176fbf0e"}},
0},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mg_proto">>,
{git,"https://github.com/valitydev/machinegun-proto.git",
{ref,"a411c7d5d779389c70d2594eb4a28a916dce1721"}},
{git,"https://github.com/valitydev/machinegun-proto",
{ref,"96f7f11b184c29d8b7e83cd7646f3f2c13662bda"}},
1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2},
{<<"msgpack_proto">>,