HG-466: Use fault detector in hellgate routing (#302)

* add fault-detector dep

* fix typo

* update rebar.lock

* update fault detector version

* init fault detector client

* add fault detector base

* add fault-detector url placeholder

* get fault-detector url from env

* fix woody client call

* fix formatting, update gitignore

* update fd client, add fd init placeholder after routing

* add fault_detector_proto to app.src

* fix register_operation in fault detector

* update register operation template after choosing route

* update fd client

* add fault detector to hg_proto

* add placeholder for fd scoring in routing

* add fault detector template to sys.config

* update fault detector client

* add GetStatistics call to routing

* update fault detector client

* update get statistics call in routing

* doc tweak

* fix typing

* fix type

* add init service after routing

* update fd client

* add fd notifiers on success and failure

* clean comments

* add fd dummy to tests

* update routing

* temporarily disable sending operations to fd

* add fd routing test case

* remove retry strategy from fd client

* minor tweak

* add fd routing test

* update tests

* update fd client

* update fd client

* refactoring, update test

* formatting fix

* minor refactoring

* more refactoring

* more refactoring

* fix formatting errors

* minor

* space fix

* fix typo

* remove whitespace

* fix whitespace

* Update hg_direct_recurrent_tests_SUITE.erl

* Update hg_invoice_tests_SUITE.erl

* formatting fixes

* separate routing tests

* update config

* update routing

* update fd client

* clean up comments, update fault detector calls

* fix errors

* fix line length

* fix commas

* fd refactoring

* improve fd formatting

* formatting

* refactoring

* add fail rate scoring test base

* split route selection into multiple functions, refactoring

* fix line length

* update fd config, move fd notification to hg_proxy_provider

* config update

* fd client refactoring

* new route selection + tests

* fix trailing comma

* update config

* update fd client with new config

* rework route selection algorithm

* fix error handling in hg_proxy_provider

* merge, update tests, fix types

* move to logger in fd client

* build_utils update

* fix build utils

* remove lager, update fd child spec

* minor refactoring

* revert to lager in fd

* revert to lager in routing tests

* add match on terminate child in tests

* remove unnecessary include
This commit is contained in:
Roman Pushkov 2019-05-27 11:06:38 +03:00 committed by GitHub
parent 4b9804fade
commit 9d51ea2f46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1799 additions and 170 deletions

1
.gitignore vendored
View File

@ -11,3 +11,4 @@ Dockerfile
docker-compose.yml
/.idea/
*.beam
tags

View File

@ -7,6 +7,7 @@
{mod, {hellgate, []}},
{applications, [
kernel,
fault_detector_proto,
stdlib,
lager,
lager_logstash_formatter,

View File

@ -0,0 +1,266 @@
%%% Fault detector interaction
-module(hg_fault_detector_client).
-include_lib("fault_detector_proto/include/fd_proto_fault_detector_thrift.hrl").
-define(service_config(SW, OTL, PAS), #fault_detector_ServiceConfig{
sliding_window = SW,
operation_time_limit = OTL,
pre_aggregation_size = PAS
}).
-define(operation(OpId, State), #fault_detector_Operation{
operation_id = OpId,
state = State
}).
-define(state_start(TimeStamp), #fault_detector_Start{ time_start = TimeStamp }).
-define(state_error(TimeStamp), #fault_detector_Error{ time_end = TimeStamp }).
-define(state_finish(TimeStamp), #fault_detector_Finish{ time_end = TimeStamp }).
-export([build_config/0]).
-export([build_config/2]).
-export([build_config/3]).
-export([build_service_id/2]).
-export([build_operation_id/1]).
-export([init_service/1]).
-export([init_service/2]).
-export([get_statistics/1]).
-export([register_operation/3]).
-export([register_operation/4]).
-type operation_status() :: start | finish | error.
-type service_stats() :: fd_proto_fault_detector_thrift:'ServiceStatistics'().
-type service_id() :: fd_proto_fault_detector_thrift:'ServiceId'().
-type operation_id() :: fd_proto_fault_detector_thrift:'OperationId'().
-type service_config() :: fd_proto_fault_detector_thrift:'ServiceConfig'().
-type sliding_window() :: fd_proto_fault_detector_thrift:'Milliseconds'().
-type operation_time_limit() :: fd_proto_fault_detector_thrift:'Milliseconds'().
-type pre_aggregation_size() :: fd_proto_fault_detector_thrift:'Seconds'() | undefined.
-type fd_service_type() :: adapter_availability.
%% API
%%------------------------------------------------------------------------------
%% @doc
%% `build_config/0` creates a default config that can be used with
%% `init_service/2` and `register_operation/4`.
%%
%% The default values can be adjusted via sys.config.
%%
%% Config
%% `SlidingWindow`: pick operations from SlidingWindow milliseconds.
%% Default: 60000
%% `OpTimeLimit`: expected operation execution time, in milliseconds.
%% Default: 10000
%% `PreAggrSize`: time interval for data preaggregation, in seconds.
%% Default: 2
%% @end
%%------------------------------------------------------------------------------
-spec build_config() ->
service_config().
build_config() ->
EnvFDConfig = genlib_app:env(hellgate, fault_detector, #{}),
SlidingWindow = genlib_map:get(sliding_window, EnvFDConfig, 60000),
OpTimeLimit = genlib_map:get(operation_time_limit, EnvFDConfig, 10000),
PreAggrSize = genlib_map:get(pre_aggregation_size, EnvFDConfig, 2),
?service_config(SlidingWindow, OpTimeLimit, PreAggrSize).
%%------------------------------------------------------------------------------
%% @doc
%% `build_config/2` receives the length of the sliding windown and the operation
%% time limit as arguments. The config can then be used with `init_service/2`
%% and `register_operation/4`.
%% @end
%%------------------------------------------------------------------------------
-spec build_config(sliding_window(), operation_time_limit()) ->
service_config().
build_config(SlidingWindow, OpTimeLimit) ->
?service_config(SlidingWindow, OpTimeLimit, undefined).
%%------------------------------------------------------------------------------
%% @doc
%% `build_config/3` is analogous to `build_config/2` but also receives
%% the optional pre-aggregation size argument.
%% @end
%%------------------------------------------------------------------------------
-spec build_config(sliding_window(), operation_time_limit(), pre_aggregation_size()) ->
service_config().
build_config(SlidingWindow, OpTimeLimit, PreAggrSize) ->
?service_config(SlidingWindow, OpTimeLimit, PreAggrSize).
%%------------------------------------------------------------------------------
%% @doc
%% `build_service_id/2` is a helper function for building service IDs
%% @end
%%------------------------------------------------------------------------------
-spec build_service_id(fd_service_type(), binary()) ->
binary().
build_service_id(ServiceType, ID) ->
do_build_service_id(ServiceType, ID).
%%------------------------------------------------------------------------------
%% @doc
%% `build_operation_id_id/3` is a helper function for building operation IDs
%% @end
%%------------------------------------------------------------------------------
-spec build_operation_id(fd_service_type()) -> binary().
build_operation_id(ServiceType) ->
do_build_operation_id(ServiceType).
%%------------------------------------------------------------------------------
%% @doc
%% `init_service/1` receives a service id and initialises a fault detector
%% service for it, allowing you to aggregate availability statistics via
%% `register_operation/3` and `register_operation/4` and fetch it using the
%% `get_statistics/1` function.
%% @end
%%------------------------------------------------------------------------------
-spec init_service(service_id()) ->
{ok, initialised} | {error, any()}.
init_service(ServiceId) ->
ServiceConfig = build_config(),
call('InitService', [ServiceId, ServiceConfig]).
%%------------------------------------------------------------------------------
%% @doc
%% `init_service/2` is analogous to `init_service/1` but also receives
%% configuration for the fault detector service created by `build_config/3`.
%% @end
%%------------------------------------------------------------------------------
-spec init_service(service_id(), service_config()) ->
{ok, initialised} | {error, any()}.
init_service(ServiceId, ServiceConfig) ->
call('InitService', [ServiceId, ServiceConfig]).
%%------------------------------------------------------------------------------
%% @doc
%% `get_statistics/1` receives a list of service ids and returns a
%% list of statistics on the services' reliability.
%%
%% Returns an empty list if the fault detector itself is unavailable. Services
%% not initialised in the fault detector will not be in the list.
%% @end
%%------------------------------------------------------------------------------
-spec get_statistics([service_id()]) -> [service_stats()].
get_statistics(ServiceIds) when is_list(ServiceIds) ->
call('GetStatistics', [ServiceIds]).
%%------------------------------------------------------------------------------
%% @doc
%% `register_operation/3` receives a service id, an operation id and an
%% operation status which is one of the following atoms: `start`, `finish`, `error`,
%% respectively for registering a start and either a successful or an erroneous
%% end of an operation. The data is then used to aggregate statistics on a
%% service's availability that is accessible via `get_statistics/1`.
%% @end
%%------------------------------------------------------------------------------
-spec register_operation(operation_status(), service_id(), operation_id()) ->
{ok, registered} | {error, not_found} | {error, any()}.
register_operation(Status, ServiceId, OperationId) ->
ServiceConfig = build_config(),
register_operation(Status, ServiceId, OperationId, ServiceConfig).
%%------------------------------------------------------------------------------
%% @doc
%% `register_operation/4` is analogous to `register_operation/3` but also
%% receives configuration for the fault detector service created
%% by `build_config/3`.
%% @end
%%------------------------------------------------------------------------------
-spec register_operation(operation_status(), service_id(), operation_id(), service_config()) ->
{ok, registered} | {error, not_found} | {error, any()}.
register_operation(Status, ServiceId, OperationId, ServiceConfig) ->
OperationState = case Status of
start -> {Status, ?state_start(hg_datetime:format_now())};
error -> {Status, ?state_error(hg_datetime:format_now())};
finish -> {Status, ?state_finish(hg_datetime:format_now())}
end,
Operation = ?operation(OperationId, OperationState),
call('RegisterOperation', [ServiceId, Operation, ServiceConfig]).
%% PRIVATE
call(Function, Args) ->
ServiceUrls = genlib_app:env(hellgate, services),
Url = genlib:to_binary(maps:get(fault_detector, ServiceUrls)),
Opts = #{url => Url},
EnvFDConfig = genlib_app:env(hellgate, fault_detector, #{}),
Timeout = genlib_map:get(timeout, EnvFDConfig, infinity),
Deadline = woody_deadline:from_timeout(Timeout),
do_call(Function, Args, Opts, Deadline).
do_call('InitService', Args, Opts, Deadline) ->
try hg_woody_wrapper:call(fault_detector, 'InitService', Args, Opts, Deadline) of
{ok, _Result} -> {ok, initialised}
catch
error:{woody_error, {_Source, Class, _Details}} = Reason
when Class =:= resource_unavailable orelse
Class =:= result_unknown ->
[ServiceId | _] = Args,
ErrorText = "Unable to init service ~p in fault detector, ~p:~p",
_ = lager:warning(ErrorText, [ServiceId, error, Reason]),
{error, Reason};
error:{woody_error, {_Source, result_unexpected, _Details}} = Reason ->
[ServiceId | _] = Args,
ErrorText = "Unable to init service ~p in fault detector, ~p:~p",
_ = lager:error(ErrorText, [ServiceId, error, Reason]),
{error, Reason}
end;
do_call('GetStatistics', Args, Opts, Deadline) ->
try hg_woody_wrapper:call(fault_detector, 'GetStatistics', Args, Opts, Deadline) of
{ok, Stats} -> Stats
catch
error:{woody_error, {_Source, Class, _Details}} = Reason
when Class =:= resource_unavailable orelse
Class =:= result_unknown ->
[ServiceIds | _] = Args,
String = "Unable to get statistics for services ~p from fault detector, ~p:~p",
_ = lager:warning(String, [ServiceIds, error, Reason]),
[];
error:{woody_error, {_Source, result_unexpected, _Details}} = Reason ->
[ServiceIds | _] = Args,
String = "Unable to get statistics for services ~p from fault detector, ~p:~p",
_ = lager:error(String, [ServiceIds, error, Reason]),
[]
end;
do_call('RegisterOperation', Args, Opts, Deadline) ->
try hg_woody_wrapper:call(fault_detector, 'RegisterOperation', Args, Opts, Deadline) of
{ok, _Result} ->
{ok, registered};
{exception, #fault_detector_ServiceNotFoundException{}} ->
{error, not_found}
catch
error:{woody_error, {_Source, Class, _Details}} = Reason
when Class =:= resource_unavailable orelse
Class =:= result_unknown ->
[ServiceId, OperationId | _] = Args,
ErrorText = "Unable to register operation ~p for service ~p in fault detector, ~p:~p",
_ = lager:warning(ErrorText, [OperationId, ServiceId, error, Reason]),
{error, Reason};
error:{woody_error, {_Source, result_unexpected, _Details}} = Reason ->
[ServiceId, OperationId | _] = Args,
ErrorText = "Unable to register operation ~p for service ~p in fault detector, ~p:~p",
_ = lager:error(ErrorText, [OperationId, ServiceId, error, Reason]),
{error, Reason}
end.
do_build_service_id(adapter_availability, ID) ->
hg_utils:construct_complex_id([
<<"hellgate_service">>,
<<"adapter_availability">>,
ID
]).
do_build_operation_id(adapter_availability) ->
hg_utils:construct_complex_id([
<<"hellgate_operation">>,
<<"adapter_availability">>,
hg_utils:unique_id()
]).

View File

@ -20,6 +20,8 @@
-include_lib("dmsl/include/dmsl_payment_processing_errors_thrift.hrl").
-include_lib("dmsl/include/dmsl_msgpack_thrift.hrl").
-include_lib("fault_detector_proto/include/fd_proto_fault_detector_thrift.hrl").
%% API
%% St accessors
@ -597,9 +599,23 @@ choose_route(PaymentInstitution, VS, Revision, St) ->
{ok, _Route} = Result ->
Result;
undefined ->
Payment = get_payment(St),
Predestination = choose_routing_predestination(Payment),
case hg_routing:choose(Predestination, PaymentInstitution, VS, Revision) of
Payment = get_payment(St),
Predestination = choose_routing_predestination(Payment),
{Providers, RejectContext0} = hg_routing:gather_providers(
Predestination,
PaymentInstitution,
VS,
Revision
),
FailRatedProviders = hg_routing:gather_provider_fail_rates(Providers),
{FailRatedRoutes, RejectContext1} = hg_routing:gather_routes(
Predestination,
FailRatedProviders,
RejectContext0,
VS,
Revision
),
case hg_routing:choose_route(FailRatedRoutes, RejectContext1, VS) of
{ok, _Route} = Result ->
Result;
{error, {no_route_found, {RejectReason, RejectContext}}} = Error ->
@ -613,6 +629,7 @@ choose_routing_predestination(#domain_InvoicePayment{make_recurrent = true}) ->
recurrent_payment;
choose_routing_predestination(#domain_InvoicePayment{payer = ?payment_resource_payer()}) ->
payment.
% Other payers has predefined routes
log_reject_context(risk_score_is_too_high = RejectReason, RejectContext) ->
@ -1052,7 +1069,7 @@ construct_refund_id(St) ->
InvoiceID = get_invoice_id(get_invoice(get_opts(St))),
SequenceID = make_refund_squence_id(PaymentID, InvoiceID),
IntRefundID = hg_sequences:get_next(SequenceID),
integer_to_binary(IntRefundID).
erlang:integer_to_binary(IntRefundID).
make_refund_squence_id(PaymentID, InvoiceID) ->
<<InvoiceID/binary, <<"_">>/binary, PaymentID/binary>>.
@ -1274,7 +1291,7 @@ get_adjustment_revision(Params) ->
).
construct_adjustment_id(#st{adjustments = As}) ->
integer_to_binary(length(As) + 1).
erlang:integer_to_binary(length(As) + 1).
-spec assert_activity(activity(), st()) -> ok | no_return().
assert_activity(Activity, #st{activity = Activity}) ->
@ -1514,7 +1531,8 @@ repair_session(St = #st{repair_scenario = Scenario}) ->
{ok, Result};
call ->
ProxyContext = construct_proxy_context(St),
issue_process_call(ProxyContext, St)
Route = get_route(St),
hg_proxy_provider:process_payment(ProxyContext, Route)
end.
-spec finalize_payment(action(), st()) -> machine_result().
@ -1543,7 +1561,8 @@ process_callback_timeout(Action, Session, Events, St) ->
handle_callback(Payload, Action, St) ->
ProxyContext = construct_proxy_context(St),
{ok, CallbackResult} = issue_callback_call(Payload, ProxyContext, St),
Route = get_route(St),
{ok, CallbackResult} = hg_proxy_provider:handle_payment_callback(Payload, ProxyContext, Route),
{Response, Result} = handle_callback_result(CallbackResult, Action, get_activity_session(St)),
{Response, finish_session_processing(Result, St)}.
@ -2525,21 +2544,6 @@ get_customer(CustomerID) ->
error({<<"Can't get customer">>, Error})
end.
issue_process_call(ProxyContext, St) ->
issue_proxy_call('ProcessPayment', [ProxyContext], St).
issue_callback_call(Payload, ProxyContext, St) ->
issue_proxy_call('HandlePaymentCallback', [Payload, ProxyContext], St).
issue_proxy_call(Func, Args, St) ->
CallOpts = get_call_options(St),
hg_woody_wrapper:call(proxy_provider, Func, Args, CallOpts).
get_call_options(St) ->
Revision = hg_domain:head(),
Provider = hg_domain:get(Revision, {provider, get_route_provider_ref(get_route(St))}),
hg_proxy:get_call_options(Provider#domain_Provider.proxy, Revision).
get_route(#st{route = Route}) ->
Route.

View File

@ -76,7 +76,34 @@ handle_recurrent_token_callback(Payload, ProxyContext, St) ->
-spec issue_call(woody:func(), list(), route()) ->
term().
issue_call(Func, Args, Route) ->
hg_woody_wrapper:call(proxy_provider, Func, Args, get_call_options(Route)).
ServiceType = adapter_availability,
ProviderRef = get_route_provider(Route),
ProviderID = ProviderRef#domain_ProviderRef.id,
BinaryID = erlang:integer_to_binary(ProviderID),
ServiceID = hg_fault_detector_client:build_service_id(ServiceType, BinaryID),
OperationID = hg_fault_detector_client:build_operation_id(ServiceType),
_ = notify_fault_detector(start, ServiceID, OperationID),
try hg_woody_wrapper:call(proxy_provider, Func, Args, get_call_options(Route)) of
Result ->
_ = notify_fault_detector(finish, ServiceID, OperationID),
Result
catch
error:{woody_error, _ErrorType} = Reason ->
_ = notify_fault_detector(error, ServiceID, OperationID),
error(Reason)
end.
notify_fault_detector(start, ServiceID, OperationID) ->
case hg_fault_detector_client:register_operation(start, ServiceID, OperationID) of
{error, not_found} ->
_ = hg_fault_detector_client:init_service(ServiceID),
_ = hg_fault_detector_client:register_operation(start, ServiceID, OperationID);
Result ->
Result
end;
notify_fault_detector(Status, ServiceID, OperationID) ->
_ = hg_fault_detector_client:register_operation(Status, ServiceID, OperationID).
get_call_options(Route) ->
Revision = hg_domain:head(),

View File

@ -197,15 +197,31 @@ namespace() ->
-spec init([payment_tool() | rec_payment_tool_params()], hg_machine:machine()) ->
hg_machine:result().
init([PaymentTool, Params], #{id := RecPaymentToolID}) ->
Revision = hg_domain:head(),
CreatedAt = hg_datetime:format_now(),
{Party, Shop} = get_party_shop(Params),
Revision = hg_domain:head(),
CreatedAt = hg_datetime:format_now(),
{Party, Shop} = get_party_shop(Params),
PaymentInstitution = get_payment_institution(Shop, Party, Revision),
RecPaymentTool = create_rec_payment_tool(RecPaymentToolID, CreatedAt, Party, Params, Revision),
VS0 = collect_varset(Party, Shop, #{payment_tool => PaymentTool}),
{RiskScore , VS1} = validate_risk_score(inspect(RecPaymentTool, VS0), VS0),
RecPaymentTool = create_rec_payment_tool(RecPaymentToolID, CreatedAt, Party, Params, Revision),
VS0 = collect_varset(Party, Shop, #{payment_tool => PaymentTool}),
{RiskScore, VS1} = validate_risk_score(inspect(RecPaymentTool, VS0), VS0),
{Providers, RejectContext0} = hg_routing:gather_providers(
recurrent_paytool,
PaymentInstitution,
VS1,
Revision
),
FailRatedProviders = hg_routing:gather_provider_fail_rates(Providers),
{FailRatedRoutes, RejectContext1} = hg_routing:gather_routes(
recurrent_paytool,
FailRatedProviders,
RejectContext0,
VS1,
Revision
),
Route = validate_route(
hg_routing:choose(recurrent_paytool, PaymentInstitution, VS1, Revision),
hg_routing:choose_route(FailRatedRoutes, RejectContext1, VS1),
RecPaymentTool
),
{ok, {Changes, Action}} = start_session(),
@ -344,7 +360,6 @@ process_callback_timeout(Action, St) ->
get_route(#st{route = Route}) ->
Route.
%%
construct_proxy_context(St) ->
@ -567,7 +582,6 @@ dispatch_callback({provider, Payload}, St) ->
throw(invalid_callback)
end.
-type tag() :: dmsl_base_thrift:'Tag'().
-type callback() :: _. %% FIXME
-type callback_response() :: _. %% FIXME

View File

@ -2,8 +2,13 @@
-module(hg_routing).
-include_lib("dmsl/include/dmsl_domain_thrift.hrl").
-include_lib("fault_detector_proto/include/fd_proto_fault_detector_thrift.hrl").
-export([gather_providers/4]).
-export([gather_provider_fail_rates/1]).
-export([gather_routes/5]).
-export([choose_route/3]).
-export([choose/4]).
-export([get_payments_terms/2]).
-export([get_rec_paytools_terms/2]).
@ -18,69 +23,167 @@
| dmsl_domain_thrift:'RecurrentPaytoolsProvisionTerms'()
| undefined.
-type payment_institution() :: dmsl_domain_thrift:'PaymentInstitution'().
-type route() :: dmsl_domain_thrift:'PaymentRoute'().
-type route_predestination() :: payment | recurrent_paytool | recurrent_payment.
-define(rejected(Reason), {rejected, Reason}).
-type reject_context():: #{
-type reject_context() :: #{
varset := hg_selector:varset(),
rejected_providers := list(rejected_provider()),
rejected_terminals := list(rejected_terminal())
}.
-type rejected_provider() :: {provider_ref(), Reason :: term()}.
-type rejected_terminal() :: {terminal_ref(), Reason :: term()}.
-type provider() :: dmsl_domain_thrift:'Provider'().
-type provider_ref() :: dmsl_domain_thrift:'ProviderRef'().
-type terminal() :: dmsl_domain_thrift:'Terminal'().
-type terminal_ref() :: dmsl_domain_thrift:'TerminalRef'().
-type provider_status() :: {provider_condition(), fail_rate()}.
-type provider_condition() :: alive | dead.
-type fail_rate() :: float().
-type fail_rated_provider() :: {provider_ref(), provider(), provider_status()}.
-type fail_rated_route() :: {provider_ref(), {terminal_ref(), terminal()}, provider_status()}.
-export_type([route_predestination/0]).
-spec choose(
-spec gather_providers(
route_predestination(),
dmsl_domain_thrift:'PaymentInstitution'(),
payment_institution(),
hg_selector:varset(),
hg_domain:revision()
) ->
{ok, route()} | {error, {no_route_found, {unknown | risk_score_is_too_high, reject_context()}}}.
{[{provider_ref(), provider()}], reject_context()}.
choose(Predestination, PaymentInstitution, VS, Revision) ->
% TODO not the optimal strategy
RejectContext0 = #{
varset => VS
gather_providers(Predestination, PaymentInstitution, VS, Revision) ->
RejectContext = #{
varset => VS,
rejected_providers => [],
rejected_terminals => []
},
{Providers, RejectContext1} = collect_providers(Predestination, PaymentInstitution, VS, Revision, RejectContext0),
{Choices, RejectContext2} = collect_routes(Predestination, Providers, VS, Revision, RejectContext1),
choose_route(Choices, VS, RejectContext2).
select_providers(Predestination, PaymentInstitution, VS, Revision, RejectContext).
collect_routes(Predestination, Providers, VS, Revision, RejectContext) ->
-spec gather_provider_fail_rates([provider_ref()]) ->
[fail_rated_provider()].
gather_provider_fail_rates(Providers) ->
score_providers_with_fault_detector(Providers).
-spec gather_routes(
route_predestination(),
[fail_rated_provider()],
reject_context(),
hg_selector:varset(),
hg_domain:revision()
) ->
{[fail_rated_route()], reject_context()}.
gather_routes(Predestination, FailRatedProviders, RejectContext, VS, Revision) ->
select_routes(Predestination, FailRatedProviders, VS, Revision, RejectContext).
-spec choose_route([fail_rated_route()], reject_context(), hg_selector:varset()) ->
{ok, route()} | {error, {no_route_found, {risk_score_is_too_high | unknown, reject_context()}}}.
choose_route(FailRatedRoutes, RejectContext, VS) ->
do_choose_route(FailRatedRoutes, VS, RejectContext).
select_providers(Predestination, PaymentInstitution, VS, Revision, RejectContext) ->
ProviderSelector = PaymentInstitution#domain_PaymentInstitution.providers,
ProviderRefs0 = reduce(provider, ProviderSelector, VS, Revision),
ProviderRefs1 = ordsets:to_list(ProviderRefs0),
{Providers, RejectReasons} = lists:foldl(
fun (ProviderRef, {Prvs, Reasons}) ->
try
P = acceptable_provider(Predestination, ProviderRef, VS, Revision),
{[P | Prvs], Reasons}
catch
?rejected(Reason) ->
{Prvs, [{ProviderRef, Reason} | Reasons]}
end
end,
{[], []},
ProviderRefs1
),
{Providers, RejectContext#{rejected_providers => RejectReasons}}.
select_routes(Predestination, FailRatedProviders, VS, Revision, RejectContext) ->
{Accepted, Rejected} = lists:foldl(
fun (Provider, {AcceptedTerminals, RejectedTerminals}) ->
{Accepts, Rejects} = collect_routes_for_provider(Predestination, Provider, VS, Revision),
{Accepts ++ AcceptedTerminals, Rejects ++ RejectedTerminals}
end,
{[], []},
Providers
FailRatedProviders
),
{Accepted, RejectContext#{rejected_terminals => Rejected}}.
choose_route(Routes, VS = #{risk_score := RiskScore}, RejectContext) ->
case lists:reverse(lists:keysort(1, score_routes(Routes, VS))) of
[{_Score, Route} | _] ->
{ok, export_route(Route)};
[] when RiskScore =:= fatal ->
{error, {no_route_found, {risk_score_is_too_high, RejectContext}}};
[] ->
{error, {no_route_found, {unknown, RejectContext}}}
end.
do_choose_route(_FailRatedRoutes, #{risk_score := fatal}, RejectContext) ->
{error, {no_route_found, {risk_score_is_too_high, RejectContext}}};
do_choose_route([] = _FailRatedRoutes, _VS, RejectContext) ->
{error, {no_route_found, {unknown, RejectContext}}};
do_choose_route(FailRatedRoutes, VS, RejectContext) ->
ScoredRoutes = score_routes(FailRatedRoutes, VS),
choose_scored_route(ScoredRoutes, RejectContext).
choose_scored_route([{_Score, Route}], _RejectContext) ->
{ok, export_route(Route)};
choose_scored_route(ScoredRoutes, _RejectContext) ->
[{_Score, Route}|_Rest] = lists:reverse(lists:keysort(1, ScoredRoutes)),
{ok, export_route(Route)}.
score_routes(Routes, VS) ->
[{score_route(R, VS), R} || R <- Routes].
[{score_route(R, VS), {Provider, Terminal}} || {Provider, Terminal, _ProviderStatus} = R <- Routes].
export_route({ProviderRef, {TerminalRef, _Terminal}}) ->
% TODO shouldn't we provide something along the lines of `get_provider_ref/1`,
% `get_terminal_ref/1` instead?
?route(ProviderRef, TerminalRef).
score_providers_with_fault_detector([]) -> [];
score_providers_with_fault_detector(Providers) ->
ServiceIDs = [build_fd_service_id(PR) || {PR, _P} <- Providers],
FDStats = hg_fault_detector_client:get_statistics(ServiceIDs),
FailRatedProviders = [{PR, P, get_provider_status(PR, P, FDStats)} || {PR, P} <- Providers],
FailRatedProviders.
%% TODO: maybe use custom cutoffs per provider
get_provider_status(ProviderRef, _Provider, FDStats) ->
ProviderID = build_fd_service_id(ProviderRef),
FDConfig = genlib_app:env(hellgate, fault_detector, #{}),
CriticalFailRate = genlib_map:get(critical_fail_rate, FDConfig, 0.7),
case lists:keysearch(ProviderID, #fault_detector_ServiceStatistics.service_id, FDStats) of
{value, #fault_detector_ServiceStatistics{failure_rate = FailRate}}
when FailRate >= CriticalFailRate ->
{0, FailRate};
{value, #fault_detector_ServiceStatistics{failure_rate = FailRate}} ->
{1, FailRate};
false ->
{1, 0.0}
end.
score_route({_Provider, {_TerminalRef, Terminal}, ProviderStatus}, VS) ->
RiskCoverage = score_risk_coverage(Terminal, VS),
{ProviderCondition, FailRate} = ProviderStatus,
SuccessRate = 1.0 - FailRate,
{ProviderCondition, RiskCoverage, SuccessRate}.
%% NOTE
%% Score [0.0 .. 1.0]
%% Higher score is better, e.g. route is more likely to be chosen.
score_risk_coverage(Terminal, VS) ->
RiskScore = getv(risk_score, VS),
RiskCoverage = Terminal#domain_Terminal.risk_coverage,
math:exp(-hg_inspector:compare_risk_score(RiskCoverage, RiskScore)).
build_fd_service_id(#domain_ProviderRef{id = ID}) ->
BinaryID = erlang:integer_to_binary(ID),
hg_fault_detector_client:build_service_id(adapter_availability, BinaryID).
-spec get_payments_terms(route(), hg_domain:revision()) -> terms().
get_payments_terms(?route(ProviderRef, TerminalRef), Revision) ->
@ -94,40 +197,6 @@ get_rec_paytools_terms(?route(ProviderRef, _), Revision) ->
#domain_Provider{recurrent_paytool_terms = Terms} = hg_domain:get(Revision, {provider, ProviderRef}),
Terms.
%%
%% NOTE
%% Score [0.0 .. 1.0]
%% Higher score is better, e.g. route is more likely to be chosen.
score_route(Route, VS) ->
score_risk_coverage(Route, VS).
score_risk_coverage({_Provider, {_TerminalRef, Terminal}}, VS) ->
RiskScore = getv(risk_score, VS),
RiskCoverage = Terminal#domain_Terminal.risk_coverage,
math:exp(-hg_inspector:compare_risk_score(RiskCoverage, RiskScore)).
%%
collect_providers(Predestination, PaymentInstitution, VS, Revision, RejectContext) ->
ProviderSelector = PaymentInstitution#domain_PaymentInstitution.providers,
ProviderRefs = reduce(provider, ProviderSelector, VS, Revision),
{Providers, RejectReasons} = lists:foldl(
fun (ProviderRef, {Prvs, Reasons}) ->
try
P = acceptable_provider(Predestination, ProviderRef, VS, Revision),
{[P | Prvs], Reasons}
catch
?rejected(Reason) ->
{Prvs, [{ProviderRef, Reason} | Reasons]}
end
end,
{[], []},
ordsets:to_list(ProviderRefs)
),
{Providers, RejectContext#{rejected_providers => RejectReasons}}.
acceptable_provider(payment, ProviderRef, VS, Revision) ->
Provider = #domain_Provider{
payment_terms = Terms
@ -152,14 +221,14 @@ acceptable_provider(recurrent_payment, ProviderRef, VS, Revision) ->
%%
collect_routes_for_provider(Predestination, {ProviderRef, Provider}, VS, Revision) ->
collect_routes_for_provider(Predestination, {ProviderRef, Provider, FailRate}, VS, Revision) ->
TerminalSelector = Provider#domain_Provider.terminal,
TerminalRefs = reduce(terminal, TerminalSelector, VS, Revision),
lists:foldl(
fun (TerminalRef, {Accepted, Rejected}) ->
try
Terminal = acceptable_terminal(Predestination, TerminalRef, Provider, VS, Revision),
{[{ProviderRef, Terminal} | Accepted], Rejected}
{[{ProviderRef, Terminal, FailRate} | Accepted], Rejected}
catch
?rejected(Reason) ->
{Accepted, [{ProviderRef, TerminalRef, Reason} | Rejected]}

View File

@ -127,7 +127,8 @@ start_app(hellgate = AppName) ->
party_management => <<"http://hellgate:8022/v1/processing/partymgmt">>,
customer_management => <<"http://hellgate:8022/v1/processing/customer_management">>,
recurrent_paytool => <<"http://hellgate:8022/v1/processing/recpaytool">>,
sequences => <<"http://sequences:8022/v1/sequences">>
sequences => <<"http://sequences:8022/v1/sequences">>,
fault_detector => <<"http://127.0.0.1:20001/">>
}},
{proxy_opts, #{
transport_opts => #{
@ -135,10 +136,17 @@ start_app(hellgate = AppName) ->
}},
{payment_retry_policy, #{
processed => {intervals, [1, 1, 1]},
captured => {intervals, [1, 1, 1]},
refunded => {intervals, [1, 1, 1]}
captured => {intervals, [1, 1, 1]},
refunded => {intervals, [1, 1, 1]}
}},
{inspect_timeout, 1000}
{inspect_timeout, 1000},
{fault_detector, #{
critical_fail_rate => 0.7,
timeout => 2000,
sliding_window => 60000,
operation_time_limit => 10000,
pre_aggregation_size => 2
}}
]), #{
hellgate_root_url => get_hellgate_url()
}};

View File

@ -65,6 +65,7 @@ init_per_suite(C) ->
PartyClient = hg_client_party:start(PartyID, hg_ct_helper:create_client(RootUrl, PartyID)),
ShopID = hg_ct_helper:create_party_and_shop(?cat(1), <<"RUB">>, ?tmpl(1), ?pinst(1), PartyClient),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, _} = supervisor:start_child(SupPid, hg_dummy_fault_detector:child_spec()),
_ = unlink(SupPid),
C1 = [
{apps, Apps},
@ -81,6 +82,8 @@ init_per_suite(C) ->
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
SupPid = cfg(test_sup, C),
ok = supervisor:terminate_child(SupPid, hg_dummy_fault_detector),
ok = hg_domain:cleanup(),
[application:stop(App) || App <- cfg(apps, C)].

View File

@ -104,6 +104,7 @@ init_per_suite(C) ->
Shop1ID = hg_ct_helper:create_party_and_shop(?cat(1), <<"RUB">>, ?tmpl(1), ?pinst(1), PartyClient),
Shop2ID = hg_ct_helper:create_party_and_shop(?cat(1), <<"RUB">>, ?tmpl(1), ?pinst(1), PartyClient),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, _} = supervisor:start_child(SupPid, hg_dummy_fault_detector:child_spec()),
_ = unlink(SupPid),
C1 = [
{apps, Apps},
@ -121,7 +122,9 @@ init_per_suite(C) ->
-spec end_per_suite(config()) -> config().
end_per_suite(C) ->
SupPid = cfg(test_sup, C),
ok = hg_domain:cleanup(),
ok = supervisor:terminate_child(SupPid, hg_dummy_fault_detector),
[application:stop(App) || App <- cfg(apps, C)].
-spec init_per_group(group_name(), config()) -> config().

View File

@ -0,0 +1,53 @@
-module(hg_dummy_fault_detector).
-behaviour(woody_server_thrift_handler).
-export([child_spec/0]).
-export([handle_function/4]).
-include_lib("fault_detector_proto/include/fd_proto_fault_detector_thrift.hrl").
-spec child_spec() ->
term().
child_spec() ->
woody_server:child_spec(
?MODULE,
#{
handlers => [{"/", {{fd_proto_fault_detector_thrift, 'FaultDetector'}, ?MODULE}}],
event_handler => scoper_woody_event_handler,
ip => {127, 0, 0, 1},
port => 20001
}
).
-spec handle_function(woody:func(), woody:args(), _, hg_woody_wrapper:handler_opts()) ->
{ok, term()} | no_return().
handle_function(
'GetStatistics',
_Args,
_Context,
_Options
) ->
{ok, [
#fault_detector_ServiceStatistics{
service_id = <<"hellgate_service.adapter_availability.200">>,
failure_rate = 0.9,
operations_count = 10,
error_operations_count = 9,
overtime_operations_count = 0,
success_operations_count = 1
},
#fault_detector_ServiceStatistics{
service_id = <<"hellgate_service.adapter_availability.201">>,
failure_rate = 0.1,
operations_count = 10,
error_operations_count = 1,
overtime_operations_count = 0,
success_operations_count = 9
}
]};
handle_function(_Function, _Args, _Context, _Options) ->
{ok, undefined}.

View File

@ -33,8 +33,6 @@
-export([overdue_invoice_cancellation/1]).
-export([invoice_cancellation_after_payment_timeout/1]).
-export([invalid_payment_amount/1]).
-export([no_route_found_for_payment/1]).
-export([fatal_risk_score_for_route_found/1]).
-export([payment_start_idempotency/1]).
-export([payment_success/1]).
@ -190,8 +188,7 @@ groups() ->
overdue_invoice_cancellation,
invoice_cancellation_after_payment_timeout,
invalid_payment_amount,
no_route_found_for_payment,
fatal_risk_score_for_route_found,
payment_start_idempotency,
payment_success,
payment_success_empty_cvv,
@ -279,6 +276,7 @@ init_per_suite(C) ->
% _ = dbg:p(all, c),
% _ = dbg:tpl({'hg_invoice_payment', 'merge_change', '_'}, x),
CowboySpec = hg_dummy_provider:get_http_cowboy_spec(),
{Apps, Ret} = hg_ct_helper:start_apps([
lager, woody, scoper, dmt_client, party_client, hellgate, {cowboy, CowboySpec}
]),
@ -293,6 +291,7 @@ init_per_suite(C) ->
ShopID = hg_ct_helper:create_party_and_shop(?cat(1), <<"RUB">>, ?tmpl(1), ?pinst(1), PartyClient),
AnotherShopID = hg_ct_helper:create_party_and_shop(?cat(1), <<"RUB">>, ?tmpl(1), ?pinst(1), AnotherPartyClient),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, _} = supervisor:start_child(SupPid, hg_dummy_fault_detector:child_spec()),
_ = unlink(SupPid),
ok = start_kv_store(SupPid),
NewC = [
@ -309,12 +308,15 @@ init_per_suite(C) ->
{test_sup, SupPid}
| C
],
ok = start_proxies([{hg_dummy_provider, 1, NewC}, {hg_dummy_inspector, 2, NewC}]),
NewC.
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
SupPid = cfg(test_sup, C),
ok = supervisor:terminate_child(SupPid, hg_dummy_fault_detector),
ok = hg_domain:cleanup(),
[application:stop(App) || App <- cfg(apps, C)],
exit(cfg(test_sup, C), shutdown).
@ -737,73 +739,6 @@ invalid_payment_amount(C) ->
errors = [<<"Invalid amount, more", _/binary>>]
}} = hg_client_invoicing:start_payment(InvoiceID2, PaymentParams, Client).
-spec no_route_found_for_payment(config()) -> test_return().
no_route_found_for_payment(_C) ->
Revision = hg_domain:head(),
PaymentInstitution = hg_domain:get(Revision, {payment_institution, ?pinst(1)}),
VS1 = #{
category => ?cat(1),
currency => ?cur(<<"RUB">>),
cost => ?cash(1000, <<"RUB">>),
payment_tool => {bank_card, #domain_BankCard{}},
party_id => <<"12345">>,
risk_score => low,
flow => instant
},
{error, {no_route_found, {unknown, #{
varset := VS1,
rejected_providers := [
{?prv(3), {'PaymentsProvisionTerms', payment_tool}},
{?prv(2), {'PaymentsProvisionTerms', category}},
{?prv(1), {'PaymentsProvisionTerms', payment_tool}}
],
rejected_terminals := []
}}}} = hg_routing:choose(payment, PaymentInstitution, VS1, Revision),
VS2 = VS1#{
payment_tool => {payment_terminal, #domain_PaymentTerminal{terminal_type = euroset}}
},
{ok, #domain_PaymentRoute{
provider = ?prv(3),
terminal = ?trm(10)
}} = hg_routing:choose(payment, PaymentInstitution, VS2, Revision).
-spec fatal_risk_score_for_route_found(config()) -> test_return().
fatal_risk_score_for_route_found(_C) ->
Revision = hg_domain:head(),
PaymentInstitution = hg_domain:get(Revision, {payment_institution, ?pinst(1)}),
VS1 = #{
category => ?cat(1),
currency => ?cur(<<"RUB">>),
cost => ?cash(1000, <<"RUB">>),
payment_tool => {bank_card, #domain_BankCard{}},
party_id => <<"12345">>,
risk_score => fatal,
flow => instant
},
{error, {no_route_found, {risk_score_is_too_high, #{
varset := VS1,
rejected_providers := [
{?prv(3), {'PaymentsProvisionTerms', payment_tool}},
{?prv(2), {'PaymentsProvisionTerms', category}},
{?prv(1), {'PaymentsProvisionTerms', payment_tool}}
],
rejected_terminals := []
}}}} = hg_routing:choose(payment, PaymentInstitution, VS1, Revision),
VS2 = VS1#{
payment_tool => {payment_terminal, #domain_PaymentTerminal{terminal_type = euroset}}
},
{error, {no_route_found, {risk_score_is_too_high, #{
varset := VS2,
rejected_providers := [
{?prv(2), {'PaymentsProvisionTerms', category}},
{?prv(1), {'PaymentsProvisionTerms', payment_tool}}
],
rejected_terminals := [{?prv(3), ?trm(10), {'Terminal', risk_coverage}}]}
}}} = hg_routing:choose(payment, PaymentInstitution, VS2, Revision).
-spec payment_start_idempotency(config()) -> test_return().
payment_start_idempotency(C) ->

View File

@ -68,6 +68,7 @@ init_per_suite(C) ->
PartyClient = hg_client_party:start(PartyID, hg_ct_helper:create_client(RootUrl, PartyID)),
ShopID = hg_ct_helper:create_party_and_shop(?cat(1), <<"RUB">>, ?tmpl(1), ?pinst(1), PartyClient),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, _} = supervisor:start_child(SupPid, hg_dummy_fault_detector:child_spec()),
_ = unlink(SupPid),
C1 = [
{apps, Apps},
@ -84,6 +85,8 @@ init_per_suite(C) ->
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
SupPid = cfg(test_sup, C),
ok = supervisor:terminate_child(SupPid, hg_dummy_fault_detector),
ok = hg_domain:cleanup(),
[application:stop(App) || App <- cfg(apps, C)].

File diff suppressed because it is too large Load Diff

View File

@ -45,6 +45,8 @@ get_service(processor) ->
{mg_proto_state_processing_thrift, 'Processor'};
get_service(eventsink) ->
{mg_proto_state_processing_thrift, 'EventSink'};
get_service(fault_detector) ->
{fd_proto_fault_detector_thrift, 'FaultDetector'};
get_service(sequences) ->
{seq_proto_sequences_thrift, 'Sequences'}.

View File

@ -37,7 +37,8 @@
customer_management => "http://hellgate:8022/v1/processing/customer_management",
% TODO make more consistent
recurrent_paytool => "http://hellgate:8022/v1/processing/recpaytool",
sequences => "http://sequences:8022/v1/sequences"
sequences => "http://sequences:8022/v1/sequences",
fault_detector => "http://fault-detector:8022/v1/fault-detector"
}},
{proxy_opts, #{
transport_opts => #{
@ -53,7 +54,14 @@
captured => no_retry,
refunded => no_retry
}},
{inspect_timeout, 3000}
{inspect_timeout, 3000},
{fault_detector, #{
critical_fail_rate => 0.7,
timeout => 2000,
sliding_window => 60000,
operation_time_limit => 10000,
pre_aggregation_size => 2
}}
]},
{dmt_client, [

View File

@ -48,7 +48,8 @@
{scoper , {git, "git@github.com:rbkmoney/scoper.git" , {branch, "master"}}},
{party_client , {git, "git@github.com:rbkmoney/party_client_erlang.git" , {branch, "master"}}},
{how_are_you , {git, "https://github.com/rbkmoney/how_are_you.git" , {branch, "master"}}},
{erl_health , {git, "https://github.com/rbkmoney/erlang-health.git" , {branch, "master"}}}
{erl_health , {git, "https://github.com/rbkmoney/erlang-health.git" , {branch, "master"}}},
{fault_detector_proto, {git, "git@github.com:rbkmoney/fault-detector-proto.git", {branch, "master"}}}
]}.
{xref_checks, [

View File

@ -24,6 +24,10 @@
{git,"https://github.com/rbkmoney/erlang-health.git",
{ref,"2575c7b63d82a92de54d2d27e504413675e64811"}},
0},
{<<"fault_detector_proto">>,
{git,"git@github.com:rbkmoney/fault-detector-proto.git",
{ref,"41d05a35dd6b71485455ed6a40f5e1ee948724ad"}},
0},
{<<"folsom">>,
{git,"git@github.com:folsom-project/folsom.git",
{ref,"9309bad9ffadeebbefe97521577c7480c7cfcd8a"}},