P2C-4: cascade routing (#225)

* Add provider selection

* P2C-4: Add routing

* Fix test

* Withdrawal session is actually list

* Fixes

* Typo in spec

* Fix another typo in spec

* Hide route session from withdrawal

* Simplify session_processing_status/1

* Match sessions and transfers

In current design withdrawal has one session with corresponding transfer
per route. So there is no need to have explicit counter.

* Rework routing, move it to own module

* Use counter for sessions and transaction cound

* Recalculate provider on route_change event

* Add quote test for routing

* Fix

* Separate ff_withdrawal and ff_withdrawal_route_utils

* choose_provider -> filter_providers

* Rename routes() to more sutable attempts()

* Don not record transient error

* Remove unreachable error handling here

* Use route() as for attempts

* Review fixes

* Lazy init for attempts

* Fix crash on marshalling withdrawal without attempts
This commit is contained in:
Sergey Yelin 2020-06-10 13:31:53 +03:00 committed by GitHub
parent b5dbcabaf0
commit cba1c26a68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 723 additions and 68 deletions

View File

@ -93,6 +93,11 @@ start_processing_apps(Options) ->
<<"/quotebank">>,
{{dmsl_withdrawals_provider_adapter_thrift, 'Adapter'}, {ff_ct_provider_handler, []}}
},
{
<<"/downbank">>,
{{dmsl_withdrawals_provider_adapter_thrift, 'Adapter'},
{ff_ct_provider_handler, [{handler, ff_ct_fail_provider}]}}
},
{
P2PAdapterAdr,
{{dmsl_p2p_adapter_thrift, 'P2PAdapter'}, {p2p_ct_provider_handler, []}}
@ -368,6 +373,19 @@ domain_config(Options, C) ->
wallet_system_account_set = {value, ?sas(1)},
identity = payment_inst_identity_id(Options),
withdrawal_providers = {decisions, [
#domain_WithdrawalProviderDecision{
if_ = {condition, {cost_in, #domain_CashRange{
upper = {inclusive, #domain_Cash{
amount = 100500,
currency = #domain_CurrencyRef{symbolic_code = <<"RUB">>}
}},
lower = {inclusive, #domain_Cash{
amount = 100500,
currency = #domain_CurrencyRef{symbolic_code = <<"RUB">>}
}}
}}},
then_ = {value, [?wthdr_prv(4), ?wthdr_prv(5)]}
},
#domain_WithdrawalProviderDecision{
if_ = {
condition,
@ -463,10 +481,13 @@ domain_config(Options, C) ->
ct_domain:proxy(?prx(3), <<"Quote proxy">>, <<"http://localhost:8222/quotebank">>),
ct_domain:proxy(?prx(4), <<"P2P inspector proxy">>, <<"http://localhost:8222/p2p_inspector">>),
ct_domain:proxy(?prx(5), <<"P2P adapter">>, <<"http://localhost:8222", P2PAdapterAdr/binary>>),
ct_domain:proxy(?prx(6), <<"Down proxy">>, <<"http://localhost:8222/downbank">>),
ct_domain:withdrawal_provider(?wthdr_prv(1), ?prx(2), provider_identity_id(Options), C),
ct_domain:withdrawal_provider(?wthdr_prv(2), ?prx(2), provider_identity_id(Options), C),
ct_domain:withdrawal_provider(?wthdr_prv(3), ?prx(3), dummy_provider_identity_id(Options), C),
ct_domain:withdrawal_provider(?wthdr_prv(4), ?prx(6), provider_identity_id(Options), C),
ct_domain:withdrawal_provider(?wthdr_prv(5), ?prx(2), provider_identity_id(Options), C),
ct_domain:p2p_provider(?p2p_prv(1), ?prx(5), dummy_provider_identity_id(Options), C),
ct_domain:contract_template(?tmpl(1), ?trms(1)),

View File

@ -188,8 +188,12 @@ get_withdrawal_session_events_ok(C) ->
DestID = create_destination(IID, C),
WdrID = process_withdrawal(WalID, DestID),
{ok, St} = ff_withdrawal_machine:get(WdrID),
Withdrawal = ff_withdrawal_machine:withdrawal(St),
[#{id := SessID}] = ff_withdrawal:sessions(Withdrawal),
{ok, RawEvents} = ff_withdrawal_session_machine:events(
WdrID,
SessID,
{undefined, 1000, forward}
),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),

View File

@ -20,10 +20,8 @@
party_revision => party_revision(),
domain_revision => domain_revision(),
route => route(),
session => session(),
p_transfer => p_transfer(),
attempts => attempts(),
resource => destination_resource(),
limit_checks => [limit_check_details()],
adjustments => adjustments_index(),
status => status(),
metadata => metadata(),
@ -82,6 +80,8 @@
provider_id := provider_id()
}.
-type attempts() :: ff_withdrawal_route_attempt_utils:attempts().
-type prepared_route() :: #{
route := route(),
party_revision := party_revision(),
@ -172,6 +172,7 @@
-export_type([action/0]).
-export_type([adjustment_params/0]).
-export_type([start_adjustment_error/0]).
-export_type([limit_check_details/0]).
%% Transfer logic callbacks
@ -186,6 +187,7 @@
-export([body/1]).
-export([status/1]).
-export([route/1]).
-export([attempts/1]).
-export([external_id/1]).
-export([created_at/1]).
-export([party_revision/1]).
@ -332,6 +334,10 @@ status(T) ->
route(T) ->
maps:get(route, T, undefined).
-spec attempts(withdrawal_state()) -> attempts() | undefined.
attempts(T) ->
maps:get(attempts, T, undefined).
-spec external_id(withdrawal_state()) -> external_id() | undefined.
external_id(T) ->
maps:get(external_id, T, undefined).
@ -449,12 +455,7 @@ effective_final_cash_flow(Withdrawal) ->
-spec sessions(withdrawal_state()) -> [session()].
sessions(Withdrawal) ->
case session(Withdrawal) of
undefined ->
[];
Session ->
[Session]
end.
ff_withdrawal_route_attempt_utils:get_sessions(attempts(Withdrawal)).
%% Сущность в настоящий момент нуждается в передаче ей управления для совершения каких-то действий
-spec is_active(withdrawal_state()) -> boolean().
@ -499,16 +500,28 @@ do_start_adjustment(Params, Withdrawal) ->
%% Internal getters
-spec update_attempts(attempts(), withdrawal_state()) -> withdrawal_state().
update_attempts(Attempts, T) ->
maps:put(attempts, Attempts, T).
-spec params(withdrawal_state()) -> transfer_params().
params(#{params := V}) ->
V.
-spec p_transfer(withdrawal_state()) -> p_transfer() | undefined.
p_transfer(Withdrawal) ->
maps:get(p_transfer, Withdrawal, undefined).
ff_withdrawal_route_attempt_utils:get_current_p_transfer(attempts(Withdrawal)).
-spec p_transfer_status(withdrawal_state()) -> ff_postings_transfer:status() | undefined.
p_transfer_status(Withdrawal) ->
case attempts(Withdrawal) of
undefined ->
undefined;
_ ->
p_transfer_status_(Withdrawal)
end.
p_transfer_status_(Withdrawal) ->
case p_transfer(Withdrawal) of
undefined ->
undefined;
@ -632,14 +645,17 @@ do_process_transfer(routing, Withdrawal) ->
do_process_transfer(p_transfer_start, Withdrawal) ->
process_p_transfer_creation(Withdrawal);
do_process_transfer(p_transfer_prepare, Withdrawal) ->
{ok, Events} = ff_pipeline:with(p_transfer, Withdrawal, fun ff_postings_transfer:prepare/1),
{continue, Events};
Tr = ff_withdrawal_route_attempt_utils:get_current_p_transfer(attempts(Withdrawal)),
{ok, Events} = ff_postings_transfer:prepare(Tr),
{continue, [{p_transfer, Ev} || Ev <- Events]};
do_process_transfer(p_transfer_commit, Withdrawal) ->
{ok, Events} = ff_pipeline:with(p_transfer, Withdrawal, fun ff_postings_transfer:commit/1),
{continue, Events};
Tr = ff_withdrawal_route_attempt_utils:get_current_p_transfer(attempts(Withdrawal)),
{ok, Events} = ff_postings_transfer:commit(Tr),
{continue, [{p_transfer, Ev} || Ev <- Events]};
do_process_transfer(p_transfer_cancel, Withdrawal) ->
{ok, Events} = ff_pipeline:with(p_transfer, Withdrawal, fun ff_postings_transfer:cancel/1),
{continue, Events};
Tr = ff_withdrawal_route_attempt_utils:get_current_p_transfer(attempts(Withdrawal)),
{ok, Events} = ff_postings_transfer:cancel(Tr),
{continue, [{p_transfer, Ev} || Ev <- Events]};
do_process_transfer(limit_check, Withdrawal) ->
process_limit_check(Withdrawal);
do_process_transfer(session_starting, Withdrawal) ->
@ -647,7 +663,8 @@ do_process_transfer(session_starting, Withdrawal) ->
do_process_transfer(session_polling, Withdrawal) ->
process_session_poll(Withdrawal);
do_process_transfer({fail, Reason}, Withdrawal) ->
process_transfer_fail(Reason, Withdrawal);
{ok, Providers} = do_process_routing(Withdrawal),
process_route_change(Providers, Withdrawal, Reason);
do_process_transfer(finish, Withdrawal) ->
process_transfer_finish(Withdrawal);
do_process_transfer(adjustment, Withdrawal) ->
@ -659,7 +676,7 @@ do_process_transfer(stop, _Withdrawal) ->
process_result().
process_routing(Withdrawal) ->
case do_process_routing(Withdrawal) of
{ok, ProviderID} ->
{ok, [ProviderID | _]} ->
{continue, [
{route_changed, #{provider_id => ProviderID}}
]};
@ -669,7 +686,7 @@ process_routing(Withdrawal) ->
process_transfer_fail(Reason, Withdrawal)
end.
-spec do_process_routing(withdrawal_state()) -> {ok, provider_id()} | {error, Reason} when
-spec do_process_routing(withdrawal_state()) -> {ok, [provider_id()]} | {error, Reason} when
Reason :: route_not_found | {inconsistent_quote_route, provider_id()}.
do_process_routing(Withdrawal) ->
WalletID = wallet_id(Withdrawal),
@ -689,20 +706,26 @@ do_process_routing(Withdrawal) ->
}),
do(fun() ->
ProviderID = unwrap(prepare_route(build_party_varset(VarsetParams), Identity, DomainRevision)),
valid = unwrap(validate_quote_provider(ProviderID, quote(Withdrawal))),
ProviderID
Providers = unwrap(prepare_route(build_party_varset(VarsetParams), Identity, DomainRevision)),
case quote(Withdrawal) of
undefined ->
Providers;
Quote ->
ProviderID = hd(Providers),
valid = unwrap(validate_quote_provider(ProviderID, Quote)),
[ProviderID]
end
end).
-spec prepare_route(party_varset(), identity(), domain_revision()) ->
{ok, provider_id()} | {error, route_not_found}.
{ok, [provider_id()]} | {error, route_not_found}.
prepare_route(PartyVarset, Identity, DomainRevision) ->
{ok, PaymentInstitutionID} = ff_party:get_identity_payment_institution_id(Identity),
{ok, PaymentInstitution} = ff_payment_institution:get(PaymentInstitutionID, DomainRevision),
case ff_payment_institution:compute_withdrawal_providers(PaymentInstitution, PartyVarset) of
{ok, Providers} ->
choose_provider(Providers, PartyVarset);
filter_providers(Providers, PartyVarset);
{error, {misconfiguration, _Details} = Error} ->
%% TODO: Do not interpret such error as an empty route list.
%% The current implementation is made for compatibility reasons.
@ -713,21 +736,19 @@ prepare_route(PartyVarset, Identity, DomainRevision) ->
-spec validate_quote_provider(provider_id(), quote()) ->
{ok, valid} | {error, {inconsistent_quote_route, provider_id()}}.
validate_quote_provider(_ProviderID, undefined) ->
{ok, valid};
validate_quote_provider(ProviderID, #{quote_data := #{<<"provider_id">> := ProviderID}}) ->
{ok, valid};
validate_quote_provider(ProviderID, _) ->
{error, {inconsistent_quote_route, ProviderID}}.
-spec choose_provider([provider_id()], party_varset()) ->
{ok, provider_id()} | {error, route_not_found}.
choose_provider(Providers, VS) ->
-spec filter_providers([provider_id()], party_varset()) ->
{ok, [provider_id()]} | {error, route_not_found}.
filter_providers(Providers, VS) ->
case lists:filter(fun(P) -> validate_withdrawals_terms(P, VS) end, Providers) of
[ProviderID | _] ->
{ok, ProviderID};
[] ->
{error, route_not_found}
{error, route_not_found};
Providers ->
{ok, Providers}
end.
-spec validate_withdrawals_terms(provider_id(), party_varset()) ->
@ -783,14 +804,14 @@ process_limit_check(Withdrawal) ->
process_result().
process_p_transfer_creation(Withdrawal) ->
FinalCashFlow = make_final_cash_flow(Withdrawal),
PTransferID = construct_p_transfer_id(id(Withdrawal)),
PTransferID = construct_p_transfer_id(Withdrawal),
{ok, PostingsTransferEvents} = ff_postings_transfer:create(PTransferID, FinalCashFlow),
{continue, [{p_transfer, Ev} || Ev <- PostingsTransferEvents]}.
-spec process_session_creation(withdrawal_state()) ->
process_result().
process_session_creation(Withdrawal) ->
ID = construct_session_id(id(Withdrawal)),
ID = construct_session_id(Withdrawal),
#{
wallet_id := WalletID,
destination_id := DestinationID
@ -821,12 +842,19 @@ process_session_creation(Withdrawal) ->
ok = create_session(ID, TransferData, SessionParams),
{continue, [{session_started, ID}]}.
construct_session_id(ID) ->
ID.
-spec construct_session_id(withdrawal_state()) -> id().
construct_session_id(Withdrawal) ->
ID = id(Withdrawal),
Index = ff_withdrawal_route_attempt_utils:get_index(attempts(Withdrawal)),
SubID = integer_to_binary(Index),
<< ID/binary, "/", SubID/binary >>.
-spec construct_p_transfer_id(id()) -> id().
construct_p_transfer_id(ID) ->
<<"ff/withdrawal/", ID/binary>>.
-spec construct_p_transfer_id(withdrawal_state()) -> id().
construct_p_transfer_id(Withdrawal) ->
ID = id(Withdrawal),
Index = ff_withdrawal_route_attempt_utils:get_index(attempts(Withdrawal)),
SubID = integer_to_binary(Index),
<<"ff/withdrawal/", ID/binary, "/", SubID/binary >>.
create_session(ID, TransferData, SessionParams) ->
case ff_withdrawal_session_machine:create(ID, TransferData, SessionParams) of
@ -1055,7 +1083,7 @@ get_quote_(Params, Destination, Resource) ->
destination => Destination,
resource => Resource
}),
ProviderID = unwrap(route, prepare_route(build_party_varset(VarsetParams), Identity, DomainRevision)),
[ProviderID | _] = unwrap(route, prepare_route(build_party_varset(VarsetParams), Identity, DomainRevision)),
{Adapter, AdapterOpts} = ff_withdrawal_session:get_adapter_with_opts(ProviderID),
GetQuoteParams = #{
external_id => maps:get(external_id, Params, undefined),
@ -1124,7 +1152,7 @@ quote_domain_revision(#{quote_data := QuoteData}) ->
-spec session(withdrawal_state()) -> session() | undefined.
session(Withdrawal) ->
maps:get(session, Withdrawal, undefined).
ff_withdrawal_route_attempt_utils:get_current_session(attempts(Withdrawal)).
-spec session_id(withdrawal_state()) -> session_id() | undefined.
session_id(T) ->
@ -1149,15 +1177,24 @@ session_result(Withdrawal) ->
-spec session_processing_status(withdrawal_state()) ->
undefined | pending | succeeded | failed.
session_processing_status(Withdrawal) ->
case session_result(Withdrawal) of
case attempts(Withdrawal) of
undefined ->
undefined;
unknown ->
pending;
{success, _TrxInfo} ->
_ ->
session_processing_status_(Withdrawal)
end.
session_processing_status_(Withdrawal) ->
Session = session(Withdrawal),
case Session of
undefined ->
undefined;
#{result := {success, _}} ->
succeeded;
{failed, _Failure} ->
failed
#{result := {failed, _}} ->
failed;
#{} ->
pending
end.
%% Withdrawal validators
@ -1207,28 +1244,36 @@ validate_destination_status(Destination) ->
%% Limit helpers
-spec limit_checks(withdrawal_state()) ->
[limit_check_details()].
limit_checks(Withdrawal) ->
maps:get(limit_checks, Withdrawal, []).
-spec add_limit_check(limit_check_details(), withdrawal_state()) ->
withdrawal_state().
add_limit_check(Check, Withdrawal) ->
Checks = limit_checks(Withdrawal),
Withdrawal#{limit_checks => [Check | Checks]}.
Attempts = attempts(Withdrawal),
Checks =
case ff_withdrawal_route_attempt_utils:get_current_limit_checks(Attempts) of
undefined ->
[Check];
C ->
[Check | C]
end,
R = ff_withdrawal_route_attempt_utils:update_current_limit_checks(Checks, Attempts),
update_attempts(R, Withdrawal).
-spec limit_check_status(withdrawal_state()) ->
ok | {failed, limit_check_details()} | unknown.
limit_check_status(#{limit_checks := Checks}) ->
limit_check_status(Withdrawal) ->
Attempts = attempts(Withdrawal),
Checks = ff_withdrawal_route_attempt_utils:get_current_limit_checks(Attempts),
limit_check_status_(Checks).
limit_check_status_(undefined) ->
unknown;
limit_check_status_(Checks) ->
case lists:dropwhile(fun is_limit_check_ok/1, Checks) of
[] ->
ok;
[H | _Tail] ->
{failed, H}
end;
limit_check_status(Withdrawal) when not is_map_key(limit_checks, Withdrawal) ->
unknown.
end.
-spec limit_check_processing_status(withdrawal_state()) ->
ok | failed | unknown.
@ -1395,6 +1440,22 @@ process_adjustment(Withdrawal) ->
Events1 = Events0 ++ handle_adjustment_changes(Changes),
handle_child_result({Action, Events1}, Withdrawal).
-spec process_route_change([provider_id()], withdrawal_state(), fail_type()) ->
process_result().
process_route_change(Providers, Withdrawal, Reason) ->
Attempts = attempts(Withdrawal),
%% TODO Remove line below after switch to [route()] from [provider_id()]
Routes = [#{provider_id => ID} || ID <- Providers],
case ff_withdrawal_route_attempt_utils:next_route(Routes, Attempts) of
{ok, Route} ->
{continue, [
{route_changed, Route}
]};
{error, route_not_found} ->
%% No more routes, return last error
process_transfer_fail(Reason, Withdrawal)
end.
-spec handle_adjustment_changes(ff_adjustment:changes()) ->
[event()].
handle_adjustment_changes(Changes) ->
@ -1476,15 +1537,29 @@ apply_event_({resource_got, Resource}, T) ->
apply_event_({limit_check, Details}, T) ->
add_limit_check(Details, T);
apply_event_({p_transfer, Ev}, T) ->
T#{p_transfer => ff_postings_transfer:apply_event(Ev, p_transfer(T))};
Tr = ff_postings_transfer:apply_event(Ev, p_transfer(T)),
Attempts = attempts(T),
R = ff_withdrawal_route_attempt_utils:update_current_p_transfer(Tr, Attempts),
update_attempts(R, T);
apply_event_({session_started, SessionID}, T) ->
Session = #{id => SessionID},
maps:put(session, Session, T);
Attempts = attempts(T),
R = ff_withdrawal_route_attempt_utils:update_current_session(Session, Attempts),
update_attempts(R, T);
apply_event_({session_finished, {SessionID, Result}}, T) ->
#{id := SessionID} = Session = session(T),
maps:put(session, Session#{result => Result}, T);
Attempts = attempts(T),
Session = ff_withdrawal_route_attempt_utils:get_current_session(Attempts),
SessionID = maps:get(id, Session),
UpdSession = Session#{result => Result},
R = ff_withdrawal_route_attempt_utils:update_current_session(UpdSession, Attempts),
update_attempts(R, T);
apply_event_({route_changed, Route}, T) ->
maps:put(route, Route, T);
Attempts = attempts(T),
R = ff_withdrawal_route_attempt_utils:new_route(Route, Attempts),
T#{
route => Route,
attempts => R
};
apply_event_({adjustment, _Ev} = Event, T) ->
apply_adjustment_event(Event, T).

View File

@ -0,0 +1,169 @@
%%%
%%% Copyright 2020 RBKmoney
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
-module(ff_withdrawal_route_attempt_utils).
-type p_transfer() :: ff_postings_transfer:transfer().
-type limit_check_details() :: ff_withdrawal:limit_check_details().
-type account() :: ff_account:account().
-type route() :: ff_withdrawal:route().
-type session() :: ff_withdrawal:session().
-type attempt() :: #{
session => session(),
p_transfer => p_transfer(),
limit_checks => [limit_check_details()]
}.
-opaque attempts() :: #{
attempts := #{route() => attempt()},
inversed_routes := [route()],
index := non_neg_integer(),
current => route()
}.
-export_type([attempts/0]).
%% API
-export([new_route/2]).
-export([next_route/2]).
-export([get_current_session/1]).
-export([get_current_p_transfer/1]).
-export([get_current_limit_checks/1]).
-export([update_current_session/2]).
-export([update_current_p_transfer/2]).
-export([update_current_limit_checks/2]).
-export([get_sessions/1]).
-export([get_index/1]).
-spec new_route(route(), attempts()) ->
attempts().
new_route(Route, undefined) ->
new_route(Route, init());
new_route(Route, Existing) ->
#{
attempts := Attempts,
inversed_routes := InvRoutes,
index := Index
} = Existing,
Existing#{
current => Route,
index => Index + 1,
inversed_routes => [Route | InvRoutes],
attempts => Attempts#{Route => #{}}
}.
-spec next_route([route()], attempts()) -> {ok, route()} | {error, route_not_found}.
next_route(Routes, #{attempts := Existing}) ->
PendingRoutes =
lists:filter(
fun(R) ->
not maps:is_key(R, Existing)
end,
Routes
),
case PendingRoutes of
[Route | _] ->
{ok, Route};
[] ->
{error, route_not_found}
end.
-spec get_current_session(attempts()) -> undefined | session().
get_current_session(Attempts) ->
Attempt = current(Attempts),
maps:get(session, Attempt, undefined).
-spec get_current_p_transfer(attempts()) -> undefined | p_transfer().
get_current_p_transfer(Attempts) ->
Attempt = current(Attempts),
maps:get(p_transfer, Attempt, undefined).
-spec get_current_limit_checks(attempts()) -> undefined | [limit_check_details()].
get_current_limit_checks(Attempts) ->
Attempt = current(Attempts),
maps:get(limit_checks, Attempt, undefined).
-spec update_current_session(session(), attempts()) -> attempts().
update_current_session(Session, Attempts) ->
Attempt = current(Attempts),
Updated = Attempt#{
session => Session
},
update_current(Updated, Attempts).
-spec update_current_p_transfer(account(), attempts()) -> attempts().
update_current_p_transfer(Account, Attempts) ->
Attempt = current(Attempts),
Updated = Attempt#{
p_transfer => Account
},
update_current(Updated, Attempts).
-spec update_current_limit_checks([limit_check_details()], attempts()) -> attempts().
update_current_limit_checks(LimitChecks, Routes) ->
Attempt = current(Routes),
Updated = Attempt#{
limit_checks => LimitChecks
},
update_current(Updated, Routes).
-spec get_sessions(attempts()) -> [session()].
get_sessions(undefined) ->
[];
get_sessions(#{attempts := Attempts, inversed_routes := InvRoutes}) ->
lists:foldl(
fun(ID, Acc) ->
Route = maps:get(ID, Attempts),
case maps:get(session, Route, undefined) of
undefined ->
Acc;
Session ->
[Session | Acc]
end
end,
[],
InvRoutes
).
-spec get_index(attempts()) -> non_neg_integer().
get_index(#{index := Index}) ->
Index.
%% Internal
-spec init() -> attempts().
init() ->
#{
attempts => #{},
inversed_routes => [],
index => 0
}.
%% @private
current(#{current := Route, attempts := Attempts}) ->
maps:get(Route, Attempts);
current(_) ->
#{}.
%% @private
update_current(Attempt, #{current := Route, attempts := Attempts} = R) ->
R#{
attempts => Attempts#{
Route => Attempt
}
}.

View File

@ -0,0 +1,300 @@
%%%
%%% Copyright 2020 RBKmoney
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
-module(ff_withdrawal_routing_SUITE).
-include_lib("stdlib/include/assert.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("shumpune_proto/include/shumpune_shumpune_thrift.hrl").
%% Common test API
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
-export([end_per_testcase/2]).
%% Tests
-export([adapter_unreachable_route_test/1]).
-export([adapter_unreachable_quote_test/1]).
%% Internal types
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
%% Macro helpers
-define(final_balance(Cash), {
element(1, Cash),
{
{inclusive, element(1, Cash)}, {inclusive, element(1, Cash)}
},
element(2, Cash)
}).
-define(final_balance(Amount, Currency), ?final_balance({Amount, Currency})).
%% Common test API implementation
-spec all() -> [test_case_name() | {group, group_name()}].
all() ->
[
{group, default}
].
-spec groups() -> [{group_name(), list(), [test_case_name()]}].
groups() ->
[
{default, [parallel], [
adapter_unreachable_route_test,
adapter_unreachable_quote_test
]}
].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
ct_helper:makeup_cfg([
ct_helper:test_case_name(init),
ct_payment_system:setup()
], C).
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_payment_system:shutdown(C).
%%
-spec init_per_group(group_name(), config()) -> config().
init_per_group(_, C) ->
C.
-spec end_per_group(group_name(), config()) -> _.
end_per_group(_, _) ->
ok.
%%
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(Name, C) ->
C1 = ct_helper:makeup_cfg([ct_helper:test_case_name(Name), ct_helper:woody_ctx()], C),
ok = ct_helper:set_context(C1),
C1.
-spec end_per_testcase(test_case_name(), config()) -> _.
end_per_testcase(_Name, _C) ->
ok = ct_helper:unset_context().
%% Tests
-spec adapter_unreachable_route_test(config()) -> test_return().
adapter_unreachable_route_test(C) ->
Currency = <<"RUB">>,
Cash = {100500, Currency},
#{
wallet_id := WalletID,
destination_id := DestinationID
} = prepare_standard_environment(Cash, C),
WithdrawalID = generate_id(),
WithdrawalParams = #{
id => WithdrawalID,
destination_id => DestinationID,
wallet_id => WalletID,
body => Cash,
external_id => WithdrawalID
},
ok = ff_withdrawal_machine:create(WithdrawalParams, ff_entity_context:new()),
?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID)),
?assertEqual(?final_balance(0, Currency), get_wallet_balance(WalletID)),
Withdrawal = get_withdrawal(WithdrawalID),
?assertEqual(WalletID, ff_withdrawal:wallet_id(Withdrawal)),
?assertEqual(DestinationID, ff_withdrawal:destination_id(Withdrawal)),
?assertEqual(Cash, ff_withdrawal:body(Withdrawal)),
?assertEqual(WithdrawalID, ff_withdrawal:external_id(Withdrawal)).
-spec adapter_unreachable_quote_test(config()) -> test_return().
adapter_unreachable_quote_test(C) ->
Currency = <<"RUB">>,
Cash = {100500, Currency},
#{
wallet_id := WalletID,
destination_id := DestinationID
} = prepare_standard_environment(Cash, C),
WithdrawalID = generate_id(),
WithdrawalParams = #{
id => WithdrawalID,
destination_id => DestinationID,
wallet_id => WalletID,
body => Cash,
external_id => WithdrawalID,
quote => #{
cash_from => Cash,
cash_to => {2120, <<"USD">>},
created_at => <<"2020-03-22T06:12:27Z">>,
expires_on => <<"2020-03-22T06:12:27Z">>,
quote_data => #{
<<"version">> => 1,
<<"quote_data">> => #{<<"test">> => <<"test">>},
<<"provider_id">> => 4
}
}
},
ok = ff_withdrawal_machine:create(WithdrawalParams, ff_entity_context:new()),
?assertEqual(
{failed, #{code => <<"authorization_error">>}},
await_final_withdrawal_status(WithdrawalID)).
%% Utils
get_withdrawal(WithdrawalID) ->
{ok, Machine} = ff_withdrawal_machine:get(WithdrawalID),
ff_withdrawal_machine:withdrawal(Machine).
get_withdrawal_status(WithdrawalID) ->
ff_withdrawal:status(get_withdrawal(WithdrawalID)).
await_final_withdrawal_status(WithdrawalID) ->
finished = ct_helper:await(
finished,
fun () ->
{ok, Machine} = ff_withdrawal_machine:get(WithdrawalID),
Withdrawal = ff_withdrawal_machine:withdrawal(Machine),
case ff_withdrawal:is_finished(Withdrawal) of
false ->
{not_finished, Withdrawal};
true ->
finished
end
end,
genlib_retry:linear(10, 1000)
),
get_withdrawal_status(WithdrawalID).
prepare_standard_environment({_Amount, Currency} = WithdrawalCash, C) ->
Party = create_party(C),
IdentityID = create_person_identity(Party, C),
WalletID = create_wallet(IdentityID, <<"My wallet">>, Currency, C),
ok = await_wallet_balance({0, Currency}, WalletID),
DestinationID = create_destination(IdentityID, Currency, C),
ok = set_wallet_balance(WithdrawalCash, WalletID),
#{
identity_id => IdentityID,
party_id => Party,
wallet_id => WalletID,
destination_id => DestinationID
}.
create_party(_C) ->
ID = genlib:bsuuid(),
_ = ff_party:create(ID),
ID.
create_person_identity(Party, C) ->
create_person_identity(Party, C, <<"good-one">>).
create_person_identity(Party, C, ProviderID) ->
create_identity(Party, ProviderID, <<"person">>, C).
create_identity(Party, ProviderID, ClassID, _C) ->
ID = genlib:unique(),
ok = ff_identity_machine:create(
#{id => ID, party => Party, provider => ProviderID, class => ClassID},
ff_entity_context:new()
),
ID.
create_wallet(IdentityID, Name, Currency, _C) ->
ID = genlib:unique(),
ok = ff_wallet_machine:create(
#{id => ID, identity => IdentityID, name => Name, currency => Currency},
ff_entity_context:new()
),
ID.
await_wallet_balance({Amount, Currency}, ID) ->
Balance = {Amount, {{inclusive, Amount}, {inclusive, Amount}}, Currency},
Balance = ct_helper:await(
Balance,
fun () -> get_wallet_balance(ID) end,
genlib_retry:linear(3, 500)
),
ok.
get_wallet_balance(ID) ->
{ok, Machine} = ff_wallet_machine:get(ID),
get_account_balance(ff_wallet:account(ff_wallet_machine:wallet(Machine))).
get_account_balance(Account) ->
{ok, {Amounts, Currency}} = ff_transaction:balance(
Account,
ff_clock:latest_clock()
),
{ff_indef:current(Amounts), ff_indef:to_range(Amounts), Currency}.
create_destination(IID, Currency, C) ->
ID = generate_id(),
{{Y, _, _}, _} = genlib_time:unixtime_to_daytime(erlang:system_time(second)),
StoreSource = ct_cardstore:bank_card(<<"4150399999000900">>, {12, Y + 1}, C),
Resource = {bank_card, #{bank_card => StoreSource}},
Params = #{id => ID, identity => IID, name => <<"XDesination">>, currency => Currency, resource => Resource},
ok = ff_destination:create(Params, ff_entity_context:new()),
authorized = ct_helper:await(
authorized,
fun () ->
{ok, Machine} = ff_destination:get_machine(ID),
ff_destination:status(ff_destination:get(Machine))
end
),
ID.
generate_id() ->
ff_id:generate_snowflake_id().
set_wallet_balance({Amount, Currency}, ID) ->
TransactionID = generate_id(),
{ok, Machine} = ff_wallet_machine:get(ID),
Account = ff_wallet:account(ff_wallet_machine:wallet(Machine)),
AccounterID = ff_account:accounter_account_id(Account),
{CurrentAmount, _, Currency} = get_account_balance(Account),
{ok, AnotherAccounterID} = create_account(Currency),
Postings = [{AnotherAccounterID, AccounterID, {Amount - CurrentAmount, Currency}}],
{ok, _} = ff_transaction:prepare(TransactionID, Postings),
{ok, _} = ff_transaction:commit(TransactionID, Postings),
ok.
create_account(CurrencyCode) ->
Description = <<"ff_test">>,
case call_accounter('CreateAccount', [construct_account_prototype(CurrencyCode, Description)]) of
{ok, Result} ->
{ok, Result};
{exception, Exception} ->
{error, {exception, Exception}}
end.
construct_account_prototype(CurrencyCode, Description) ->
#shumpune_AccountPrototype{
currency_sym_code = CurrencyCode,
description = Description
}.
call_accounter(Function, Args) ->
Service = {shumpune_shumpune_thrift, 'Accounter'},
ff_woody_client:call(accounter, {Service, Function, Args}, woody_context:new()).

View File

@ -0,0 +1,80 @@
-module(ff_ct_fail_provider).
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("damsel/include/dmsl_withdrawals_provider_adapter_thrift.hrl").
%% API
-export([start/0]).
-export([start/1]).
%% Processing callbacks
-export([process_withdrawal/3]).
-export([get_quote/2]).
%%
%% Internal types
%%
-type destination() :: dmsl_withdrawals_domain_thrift:'Destination'().
-type identity() :: dmsl_withdrawals_domain_thrift:'Identity'().
-type cash() :: dmsl_domain_thrift:'Cash'().
-type currency() :: dmsl_domain_thrift:'Currency'().
-type failure() :: dmsl_domain_thrift:'Failure'().
-type domain_quote() :: dmsl_withdrawals_provider_adapter_thrift:'Quote'().
-type withdrawal() :: #{
id => binary(),
body => cash(),
destination => destination(),
sender => identity(),
receiver => identity(),
quote => domain_quote()
}.
-type quote_params() :: #{
idempotency_id => binary(),
currency_from := currency(),
currency_to := currency(),
exchange_cash := cash()
}.
-type quote() :: #{
cash_from := cash(),
cash_to := cash(),
created_at := binary(),
expires_on := binary(),
quote_data := any()
}.
-record(state, {}).
-type state() :: #state{}.
%%
%% API
%%
-spec start() -> {ok, pid()}.
start() ->
start([]).
-spec start(list()) -> {ok, pid()}.
start(Opts) ->
{ok, Pid} = supervisor:start_link(ff_ct_provider_sup, Opts),
_ = erlang:unlink(Pid),
{ok, Pid}.
%%
%% Processing callbacks
%%
-spec process_withdrawal(withdrawal(), state(), map()) -> {finish, Status} | {sleep, Timer} when
Status :: {success, TrxInfo} | {failure, failure()},
Timer :: {deadline, binary()} | {timeout, integer()},
TrxInfo :: #{id => binary()}.
process_withdrawal(_Withdrawal, State, _Options) ->
{ok, {finish, {failure, <<"authorization_error">>}}, State}.
-spec get_quote(quote_params(), map()) ->
{ok, quote()}.
get_quote(_Quote, _Options) ->
erlang:error(not_implemented).

View File

@ -12,19 +12,21 @@
-spec handle_function(woody:func(), woody:args(), woody_context:ctx(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessWithdrawal', [Withdrawal, InternalState, Options], _Context, _Opts) ->
handle_function('ProcessWithdrawal', [Withdrawal, InternalState, Options], _Context, Opts) ->
Handler = get_handler(Opts),
DWithdrawal = decode_withdrawal(Withdrawal),
DState = decode_state(InternalState),
DOptions = decode_options(Options),
{ok, Intent, NewState} = ff_ct_provider:process_withdrawal(DWithdrawal, DState, DOptions),
{ok, Intent, NewState} = Handler:process_withdrawal(DWithdrawal, DState, DOptions),
{ok, #wthadpt_ProcessResult{
intent = encode_intent(Intent),
next_state = encode_state(NewState)
}};
handle_function('GetQuote', [QuoteParams, Options], _Context, _Opts) ->
handle_function('GetQuote', [QuoteParams, Options], _Context, Opts) ->
Handler = get_handler(Opts),
Params = decode_quote_params(QuoteParams),
DOptions = decode_options(Options),
{ok, Quote} = ff_ct_provider:get_quote(Params, DOptions),
{ok, Quote} = Handler:get_quote(Params, DOptions),
{ok, encode_quote(Quote)}.
%%
@ -104,3 +106,7 @@ encode_quote(#{
expires_on = ExpiresOn,
quote_data = QuoteData
}.
get_handler(Opts) ->
proplists:get_value(handler, Opts, ff_ct_provider).