ED-64: Gunner pool integration (#23)

This commit is contained in:
Alexey 2021-03-29 11:04:16 +03:00 committed by GitHub
parent 1f76fab18c
commit 133b0d5b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 259 additions and 184 deletions

View File

@ -23,19 +23,30 @@
{shutdown_timeout, 1000},
{opa, #{
endpoint => {"opa", 8181},
% Which transport to use? (tcp | tls)
transport => tcp,
% Which `gen_tcp:connect_option()`s to use? Relevant only for `tcp` transport.
tcp_opts => [inet6],
% Which `ssl:tls_client_option()`s to use? Relevant only for `tls` transport.
tls_opts => [{verify, verify_peer}],
% Total timeout for estabilishing a connection. (ms)
connect_timeout => 1000,
% Timeout for domain lookup query. (ms)
domain_lookup_timeout => 1000,
% Timeout for making request and receiving response. (ms)
request_timeout => 1000
%% Endpoint of the OPA service
endpoint => {{resolve, dns, "opa", #{pick => random}}, 8181},
%% Pool options, see gunner_pool:pool_opts()
pool_opts => #{
cleanup_interval => 1000,
max_connection_load => 1,
max_connection_idle_age => 2000,
max_size => 200,
min_size => 5,
connection_opts => #{
% Which transport to use? (tcp | tls)
transport => tcp,
% Which `gen_tcp:connect_option()`s to use? Relevant only for `tcp` transport.
tcp_opts => [inet6],
% Which `ssl:tls_client_option()`s to use? Relevant only for `tls` transport.
tls_opts => [{verify, verify_peer}],
% Total timeout for estabilishing a connection. (ms)
connect_timeout => 1000,
% Timeout for domain lookup query. (ms)
domain_lookup_timeout => 1000,
% Timeout for making request and receiving response. (ms)
request_timeout => 1000
}
}
}},
{woody_event_handlers, [

View File

@ -46,7 +46,8 @@
{recon, "2.5.1"},
{logger_logstash_formatter,
{git, "https://github.com/rbkmoney/logger_logstash_formatter.git", {branch, "master"}}},
{how_are_you, {git, "https://github.com/rbkmoney/how_are_you.git", {branch, "master"}}}
{how_are_you, {git, "https://github.com/rbkmoney/how_are_you.git", {branch, "master"}}},
{gunner, {git, "git@github.com:rbkmoney/gunner.git", {branch, "master"}}}
]}.
%% Helpful plugins.

View File

@ -27,7 +27,11 @@
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},1},
{<<"gun">>,
{git,"https://github.com/ninenines/gun.git",
{ref,"f2e8d103dd7827251fa726c42e307e42cef8a3dc"}},
{ref,"fe25965f3a2f1347529fec8c7afa981313378e31"}},
0},
{<<"gunner">>,
{git,"git@github.com:rbkmoney/gunner.git",
{ref,"fe2a29fba781be199aee9d4c408dd4662f52d474"}},
0},
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.15.2">>},1},
{<<"how_are_you">>,

View File

@ -14,7 +14,8 @@
jesse,
bouncer_proto,
org_management_proto,
erl_health
erl_health,
gunner
]},
{env, []},
{licenses, ["Apache 2.0"]}

View File

@ -31,6 +31,7 @@ init([]) ->
ServiceOpts = genlib_app:env(?MODULE, services, #{}),
EventHandlers = genlib_app:env(?MODULE, woody_event_handlers, [woody_event_handler_default]),
Healthcheck = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})),
{OpaClient, OpaClientSpec} = bouncer_opa_client:init(get_opa_opts()),
WoodySpec = woody_server:child_spec(
?MODULE,
#{
@ -41,13 +42,14 @@ init([]) ->
shutdown_timeout => get_shutdown_timeout(),
event_handler => EventHandlers,
handlers =>
get_handler_specs(ServiceOpts, AuditPulse) ++ get_stub_handler_specs(ServiceOpts),
get_handler_specs(ServiceOpts, AuditPulse, OpaClient) ++
get_stub_handler_specs(ServiceOpts),
additional_routes => [erl_health_handle:get_route(Healthcheck)]
}
),
{ok, {
#{strategy => one_for_one, intensity => 10, period => 10},
AuditSpecs ++ [WoodySpec]
AuditSpecs ++ [OpaClientSpec, WoodySpec]
}}.
-spec get_audit_specs() -> {[supervisor:child_spec()], bouncer_arbiter_pulse:handlers()}.
@ -82,12 +84,15 @@ get_transport_opts() ->
get_shutdown_timeout() ->
genlib_app:env(?MODULE, shutdown_timeout, 0).
-spec get_handler_specs(map(), bouncer_arbiter_pulse:handlers()) ->
-spec get_handler_specs(map(), bouncer_arbiter_pulse:handlers(), bouncer_opa_client:client()) ->
[woody:http_handler(woody:th_handler())].
get_handler_specs(ServiceOpts, AuditPulse) ->
get_handler_specs(ServiceOpts, AuditPulse, OpaClient) ->
ArbiterService = maps:get(arbiter, ServiceOpts, #{}),
ArbiterPulse = maps:get(pulse, ArbiterService, []),
ArbiterOpts = #{pulse => AuditPulse ++ ArbiterPulse},
ArbiterOpts = #{
pulse => AuditPulse ++ ArbiterPulse,
opa_client => OpaClient
},
[
{
maps:get(path, ArbiterService, <<"/v1/arbiter">>),
@ -106,6 +111,10 @@ get_stub_handler_specs(ServiceOpts) ->
}
].
-spec get_opa_opts() -> bouncer_opa_client:opts().
get_opa_opts() ->
genlib_app:env(?MODULE, opa, #{}).
%%
-spec enable_health_logging(erl_health:check()) -> erl_health:check().

View File

@ -12,7 +12,8 @@
%% "forbidden": [{"code": "...", ...}, ...] // 0 or more, may be unset
%% }
%% ```
-type ruleset_id() :: iodata().
-type ruleset_id() :: bouncer_opa_client:ruleset_id().
-type document() :: bouncer_opa_client:document().
-type judgement() :: {resolution(), [assertion()]}.
-type resolution() :: allowed | forbidden | {restricted, map()}.
@ -21,28 +22,22 @@
-export_type([judgement/0]).
-export_type([resolution/0]).
-export([judge/2]).
-export([judge/3]).
%%
-spec judge(ruleset_id(), bouncer_context:ctx()) ->
-spec judge(ruleset_id(), bouncer_context:ctx(), bouncer_opa_client:client()) ->
{ok, judgement()}
| {error,
ruleset_notfound
| {ruleset_invalid, _Details}
| {unavailable | unknown, _Reason}}.
judge(RulesetID, Context) ->
case mk_opa_client() of
{ok, Client} ->
Location = join_path(RulesetID, <<"/judgement">>),
case request_opa_document(Location, Context, Client) of
{ok, Document} ->
infer_judgement(Document);
{error, notfound} ->
{error, ruleset_notfound};
{error, _} = Error ->
Error
end;
judge(RulesetID, Context, Client) ->
case bouncer_opa_client:request_document(RulesetID, Context, Client) of
{ok, Document} ->
infer_judgement(Document);
{error, notfound} ->
{error, ruleset_notfound};
{error, _} = Error ->
Error
end.
@ -113,139 +108,3 @@ get_judgement_schema() ->
{<<"additionalProperties">>, false},
{<<"required">>, [<<"resolution">>]}
].
%%
-type endpoint() :: {inet:hostname() | inet:ip_address(), inet:port_number()}.
-type client_opts() :: #{
endpoint := endpoint(),
transport => tcp | tls,
tcp_opts => [gen_tcp:connect_option()],
tls_opts => [ssl:tls_client_option()],
connect_timeout => timeout(),
domain_lookup_timeout => timeout(),
request_timeout => timeout(),
http_opts => gun:http_opts(),
http2_opts => gun:http2_opts(),
% TODO
% Pulse over gun event handler mechanic.
event_handler => {module(), _State}
}.
-define(DEFAULT_CLIENT_OPTS, #{
domain_lookup_timeout => 1000,
connect_timeout => 1000,
request_timeout => 1000
}).
-type client() :: {pid(), client_opts()}.
-type document() ::
null
| binary()
| number()
| boolean()
| #{atom() | binary() => document()}
| [document()].
-spec mk_opa_client() -> {ok, client()} | {error, {unavailable, _Reason}}.
mk_opa_client() ->
Opts = get_opa_client_opts(),
{Host, Port} = maps:get(endpoint, Opts),
GunOpts = maps:with(
[
transport,
tcp_opts,
tls_opts,
connect_timeout,
domain_lookup_timeout,
http_opts,
http2_opts,
event_handler
],
Opts
),
% TODO
% We might want to mask intermittent service unavailability here with retries though.
% Leave it up to our clients for the time being, in the name of configuration simplicity
% and predictability.
case gun:open(Host, Port, GunOpts#{retry => 0}) of
{ok, Client} ->
Timeout = maps:get(connect_timeout, Opts),
case gun:await_up(Client, Timeout) of
{ok, _} ->
{ok, {Client, Opts}};
{error, Reason} ->
{error, {unavailable, Reason}}
end;
{error, Reason = {options, _}} ->
erlang:error({invalid_client_options, Reason, Opts})
end.
-spec request_opa_document(_ID :: iodata(), _Input :: document(), client()) ->
{ok, document()}
| {error,
notfound
| {unknown, _Reason}}.
request_opa_document(ID, Input, {Client, Opts}) ->
Path = join_path(<<"/v1/data">>, ID),
% TODO
% A bit hacky, ordsets are allowed in context and supposed to be opaque, at least by design.
% We probably need something like `bouncer_context:to_json/1`.
Body = jsx:encode(#{input => Input}),
CType = <<"application/json; charset=utf-8">>,
Headers = #{
<<"content-type">> => CType,
<<"accept">> => CType
},
Timeout = maps:get(request_timeout, Opts),
StreamRef = gun:post(Client, Path, Headers, Body),
% TODO think about implications
Deadline = erlang:monotonic_time(millisecond) + Timeout,
case gun:await(Client, StreamRef, Timeout) of
{response, nofin, 200, _Headers} ->
TimeoutLeft = Deadline - erlang:monotonic_time(millisecond),
case gun:await_body(Client, StreamRef, TimeoutLeft) of
{ok, Response, _Trailers} ->
decode_document(Response);
{ok, Response} ->
decode_document(Response);
{error, Reason} ->
{error, {unknown, Reason}}
end;
{response, fin, 404, _Headers} ->
{error, notfound};
{error, Reason} ->
{error, {unknown, Reason}}
end.
-spec decode_document(binary()) -> {ok, document()} | {error, notfound}.
decode_document(Response) ->
case jsx:decode(Response) of
#{<<"result">> := Result} ->
{ok, Result};
#{} ->
{error, notfound}
end.
-spec get_opa_client_opts() -> client_opts().
get_opa_client_opts() ->
maps:merge(
?DEFAULT_CLIENT_OPTS,
application:get_env(bouncer, opa, #{})
).
%%
join_path(F1, F2) when is_binary(F1), is_binary(F2) ->
normalize_path(genlib_string:cat(normalize_path(F1), normalize_path(F2))).
normalize_path(P = <<$/, P1/binary>>) ->
S1 = byte_size(P1),
case S1 > 0 andalso binary:last(P1) of
$/ -> binary:part(P, 0, S1);
_ -> P
end;
normalize_path(P) when is_binary(P) ->
normalize_path(<<$/, P/binary>>);
normalize_path(P) ->
normalize_path(iolist_to_binary(P)).

View File

@ -11,12 +11,14 @@
%%
-type opts() :: #{
pulse => bouncer_arbiter_pulse:handlers()
pulse => bouncer_arbiter_pulse:handlers(),
opa_client := bouncer_opa_client:client()
}.
-record(st, {
pulse :: bouncer_arbiter_pulse:handlers(),
pulse_metadata :: bouncer_arbiter_pulse:metadata()
pulse_metadata :: bouncer_arbiter_pulse:metadata(),
opa_client :: bouncer_opa_client:client()
}).
-type st() :: #st{}.
@ -29,7 +31,8 @@ handle_function(Fn, Args, WoodyCtx, Opts) ->
do_handle_function('Judge', {RulesetID, ContextIn}, WoodyCtx, Opts) ->
St = #st{
pulse = maps:get(pulse, Opts, []),
pulse_metadata = #{woody_ctx => WoodyCtx}
pulse_metadata = #{woody_ctx => WoodyCtx},
opa_client = maps:get(opa_client, Opts)
},
try
handle_judge(RulesetID, ContextIn, St)
@ -47,7 +50,7 @@ handle_judge(RulesetID, ContextIn, St0) ->
St1 = append_pulse_metadata(#{ruleset => RulesetID}, St0),
ok = handle_judgement_beat(started, St1),
{Context, St2} = decode_context(ContextIn, St1),
case bouncer_arbiter:judge(RulesetID, Context) of
case bouncer_arbiter:judge(RulesetID, Context, St2#st.opa_client) of
{ok, Judgement} ->
ok = handle_judgement_beat({completed, Judgement}, St2),
{ok, encode_judgement(Judgement)};

162
src/bouncer_opa_client.erl Normal file
View File

@ -0,0 +1,162 @@
-module(bouncer_opa_client).
%% API Functions
-export([init/1]).
-export([request_document/3]).
%% API Types
-type endpoint() :: {resolve() | inet:hostname() | inet:ip_address(), inet:port_number()}.
-type resolve() :: {resolve, dns, inet:hostname(), #{pick => gunner_resolver:ip_picker()}}.
-type opts() :: #{
pool_opts := gunner:pool_opts(),
endpoint := endpoint()
}.
-opaque client() :: #{
endpoint := endpoint(),
request_timeout := timeout(),
connect_timeout := timeout()
}.
-type ruleset_id() :: iodata().
-type document() ::
null
| binary()
| number()
| boolean()
| #{atom() | binary() => document()}
| [document()].
-export_type([opts/0]).
-export_type([client/0]).
-export_type([ruleset_id/0]).
-export_type([document/0]).
%%
-define(DEFAULT_REQUEST_TIMEOUT, 1000).
-define(GUNNER_POOL_ID, bouncer_opa_client_pool).
%%
%% API Functions
%%
-spec init(opts()) -> {client(), supervisor:child_spec()}.
init(OpaClientOpts) ->
PoolOpts = maps:get(pool_opts, OpaClientOpts),
PoolReg = {local, ?GUNNER_POOL_ID},
ChildSpec = #{
id => ?GUNNER_POOL_ID,
start => {gunner_pool, start_link, [PoolReg, PoolOpts]}
},
Client = genlib_map:compact(#{
endpoint => maps:get(endpoint, OpaClientOpts),
request_timeout => get_request_timeout(PoolOpts),
connect_timeout => get_connect_timeout(PoolOpts)
}),
{Client, ChildSpec}.
-spec request_document(_ID :: iodata(), _Input :: document(), client()) ->
{ok, document()}
| {error,
notfound
| {unavailable, _Reason}
| {unknown, _Reason}}.
request_document(RulesetID, Input, Client) ->
#{
endpoint := Endpoint,
request_timeout := RequestTimeout
} = Client,
Path = join_path(<<"/v1/data">>, join_path(RulesetID, <<"/judgement">>)),
% TODO
% A bit hacky, ordsets are allowed in context and supposed to be opaque, at least by design.
% We probably need something like `bouncer_context:to_json/1`.
Body = jsx:encode(#{input => Input}),
CType = <<"application/json; charset=utf-8">>,
Headers = #{
<<"content-type">> => CType,
<<"accept">> => CType
},
Deadline = erlang:monotonic_time(millisecond) + RequestTimeout,
try
ResolvedEndpoint = resolve_endpoint(Endpoint, RequestTimeout),
TimeoutLeft = Deadline - erlang:monotonic_time(millisecond),
GunnerOpts = make_gunner_opts(TimeoutLeft, Client),
%% Trying the synchronous API first
case gunner:post(?GUNNER_POOL_ID, ResolvedEndpoint, Path, Body, Headers, GunnerOpts) of
{ok, 200, _, Response} when is_binary(Response) ->
decode_document(Response);
{ok, 404, _, _} ->
{error, notfound};
{ok, Code, _, Response} ->
{error, {unknown, {Code, Response}}};
{error, {unknown, Reason}} ->
{error, {unknown, Reason}};
{error, Reason} ->
{error, {unavailable, Reason}}
end
catch
throw:{resolve_failed, ResolvError} ->
{error, {unavailable, ResolvError}}
end.
%%
-spec decode_document(binary()) -> {ok, document()} | {error, notfound}.
decode_document(Response) ->
case jsx:decode(Response) of
#{<<"result">> := Result} ->
{ok, Result};
#{} ->
{error, notfound}
end.
%%
resolve_endpoint({{resolve, dns, Hostname, Opts}, Port}, Timeout) ->
case gunner_resolver:resolve_endpoint({Hostname, Port}, make_resolver_opts(Timeout, Opts)) of
{ok, ResolvedEndpoint} ->
ResolvedEndpoint;
{error, Reason} ->
throw({resolve_failed, Reason})
end;
resolve_endpoint(Endpoint, _Timeout) ->
Endpoint.
make_resolver_opts(Timeout, #{pick := IpPicker}) ->
#{timeout => Timeout, ip_picker => IpPicker}.
make_gunner_opts(RequestTimeout, #{connect_timeout := ConnectTimeout}) ->
#{request_timeout => RequestTimeout, acquire_timeout => ConnectTimeout};
make_gunner_opts(RequestTimeout, _Client) ->
#{request_timeout => RequestTimeout}.
-spec get_request_timeout(gunner:pool_opts()) -> timeout().
get_request_timeout(PoolOpts) ->
ClientOpts = maps:get(connection_opts, PoolOpts, #{}),
maps:get(request_timeout, ClientOpts, ?DEFAULT_REQUEST_TIMEOUT).
-spec get_connect_timeout(gunner:pool_opts()) -> timeout() | undefined.
get_connect_timeout(PoolOpts) ->
ClientOpts = maps:get(connection_opts, PoolOpts, #{}),
maps:get(connect_timeout, ClientOpts, undefined).
%%
join_path(F1, F2) when is_binary(F1), is_binary(F2) ->
normalize_path(genlib_string:cat(normalize_path(F1), normalize_path(F2))).
normalize_path(P = <<$/, P1/binary>>) ->
S1 = byte_size(P1),
case S1 > 0 andalso binary:last(P1) of
$/ -> binary:part(P, 0, S1);
_ -> P
end;
normalize_path(P) when is_binary(P) ->
normalize_path(<<$/, P/binary>>);
normalize_path(P) ->
normalize_path(iolist_to_binary(P)).

View File

@ -273,7 +273,11 @@ start_bouncer(Env, C) ->
}},
{opa, #{
endpoint => ?OPA_ENDPOINT,
transport => tcp
pool_opts => #{
connection_opts => #{
transport => tcp
}
}
}}
] ++ Env
),

View File

@ -74,6 +74,14 @@ start_bouncer(Env, C) ->
org_management => #{
path => OrgmgmtPath
}
}},
{opa, #{
endpoint => ?OPA_ENDPOINT,
pool_opts => #{
connection_opts => #{
transport => tcp
}
}
}}
] ++ Env
),

View File

@ -45,6 +45,7 @@
%%
-define(OPA_HOST, "opa").
-define(OPA_ENDPOINT_RESOLVE, {{resolve, dns, ?OPA_HOST, #{pick => random}}, 8181}).
-define(OPA_ENDPOINT, {?OPA_HOST, 8181}).
-define(API_RULESET_ID, "service/authz/api").
@ -122,8 +123,12 @@ start_bouncer(Env, C) ->
num_acceptors => 4
}},
{opa, #{
endpoint => ?OPA_ENDPOINT,
transport => tcp
endpoint => ?OPA_ENDPOINT_RESOLVE,
pool_opts => #{
connection_opts => #{
transport => tcp
}
}
}}
] ++ Env
),
@ -508,8 +513,12 @@ connect_failed_means_unavailable(C) ->
[
{opa, #{
endpoint => {?OPA_HOST, 65535},
transport => tcp,
event_handler => {ct_gun_event_h, []}
pool_opts => #{
connection_opts => #{
transport => tcp,
event_handler => {ct_gun_event_h, []}
}
}
}}
],
C
@ -523,7 +532,7 @@ connect_failed_means_unavailable(C) ->
?assertMatch(
[
{judgement, started},
{judgement, {failed, {unavailable, {down, {shutdown, econnrefused}}}}}
{judgement, {failed, {unavailable, {connection_failed, {shutdown, econnrefused}}}}}
],
flush_beats(Client, C1)
)
@ -581,8 +590,12 @@ start_proxy_bouncer(Proxy, C) ->
[
{opa, #{
endpoint => ct_proxy:endpoint(Proxy),
transport => tcp,
event_handler => {ct_gun_event_h, []}
pool_opts => #{
connection_opts => #{
transport => tcp,
event_handler => {ct_gun_event_h, []}
}
}
}}
],
C