MSPF-490: Monitor to detect client disconnection (#111)

* Monitor to detect client disconnetion on server side

* Renaming save_woody_state

* Rework monitor to use one per connection

* Changed event type

* wip: stream handler skeleton

* wip: log abnormal termination

* Upgrade cowboy to 2.7.0

* Finish migration to stream_handler

* Upgrade more dependencies

* Fix state type, remove early_error logging

* rename woody_monitor to comply with cowboy naming

* remove redundant monitor pid from state

* Log only socket errors

* wrap put_woody_state

* experimental: monitor for woody_events

* add missing update

* match on handle_event return, don't monitor tracing

* Rework update_woody_state

* Slightly increase min_complexity in linter config

* Make event state transition explicit

* Remove handle_event wraper

* Revert "Slightly increase min_complexity in linter config"

This reverts commit 38301e3b5b66967cb08c0bf7f27a2a8042a40a19.

* match unmetched returns

* Add test

* Erase trailing whitespace

* Refactor test

* Delete mistakenly added file

* Rename function

* Review fixes

* Match supervisor return

Co-Authored-By: Andrew Mayorov <a.mayorov@rbkmoney.com>

Co-authored-by: Andrew Mayorov <encube.ul@gmail.com>
This commit is contained in:
Toporkov Igor 2020-02-05 14:35:43 +03:00 committed by GitHub
parent 295b864cf7
commit ed644d7d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 222 additions and 19 deletions

View File

@ -20,7 +20,7 @@
% Common project dependencies.
{deps, [
{cowboy, "2.5.0"},
{cowboy, "2.7.0"},
{hackney, "1.15.2"},
{rfc3339, "0.2.2"},
{gproc , "0.8.0"},

View File

@ -6,8 +6,8 @@
{git,"git@github.com:rbkmoney/cg_mon.git",
{ref,"5a87a37694e42b6592d3b4164ae54e0e87e24e18"}},
1},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.5.0">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.6.0">>},1},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.7.0">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.8.0">>},1},
{<<"folsom">>,
{git,"git@github.com:folsom-project/folsom.git",
{ref,"9309bad9ffadeebbefe97521577c7480c7cfcd8a"}},
@ -26,7 +26,7 @@
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.6.2">>},1},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.7.1">>},1},
{<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.2.2">>},0},
{<<"snowflake">>,
{git,"https://github.com/rbkmoney/snowflake.git",
@ -43,15 +43,15 @@
{<<"bear">>, <<"16264309AE5D005D03718A5C82641FCC259C9E8F09ADEB6FD79CA4271168656F">>},
{<<"cache">>, <<"3C11DBF4CD8FCD5787C95A5FB2A04038E3729CFCA0386016EEA8C953AB48A5AB">>},
{<<"certifi">>, <<"867CE347F7C7D78563450A18A6A28A8090331E77FA02380B4A21962A65D36EE5">>},
{<<"cowboy">>, <<"4EF3AE066EE10FE01EA3272EDC8F024347A0D3EB95F6FBB9AED556DACBFC1337">>},
{<<"cowlib">>, <<"8AA629F81A0FC189F261DC98A42243FA842625FEEA3C7EC56C48F4CCDB55490F">>},
{<<"cowboy">>, <<"91ED100138A764355F43316B1D23D7FF6BDB0DE4EA618CB5D8677C93A7A2F115">>},
{<<"cowlib">>, <<"FD0FF1787DB84AC415B8211573E9A30A3EBE71B5CBFF7F720089972B2319C8A4">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hackney">>, <<"07E33C794F8F8964EE86CEBEC1A8ED88DB5070E52E904B8F12209773C1036085">>},
{<<"idna">>, <<"689C46CBCDF3524C44D5F3DDE8001F364CD7608A99556D8FBD8239A5798D4C10">>},
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>},
{<<"ranch">>, <<"6DB93C78F411EE033DBB18BA8234C5574883ACB9A75AF0FB90A9B82EA46AFA00">>},
{<<"ranch">>, <<"6B1FAB51B49196860B733A49C07604465A47BDB78AA10C1C16A3D199F7F8C881">>},
{<<"rfc3339">>, <<"1552DF616ACA368D982E9F085A0E933B6688A3F4938A671798978EC2C0C58730">>},
{<<"ssl_verify_fun">>, <<"6EAF7AD16CB568BB01753DBBD7A95FF8B91C7979482B95F38443FE2C8852A79B">>},
{<<"unicode_util_compat">>, <<"D869E4C68901DD9531385BB0C8C40444EBF624E60B6962D95952775CAC5E90CD">>}]}

77
src/woody_monitor_h.erl Normal file
View File

@ -0,0 +1,77 @@
-module(woody_monitor_h).
-behaviour(cowboy_stream).
-include("woody_defs.hrl").
%% callback exports
-export([init/3]).
-export([data/4]).
-export([info/3]).
-export([terminate/3]).
-export([early_error/5]).
-type state() :: #{
next := any(),
woody_state => woody_state:st(),
event_to_emit => woody_event_handler:event()
}.
-export([put_woody_state/2]).
-export([set_event/2]).
-spec put_woody_state(woody_state:st(), cowboy_req:req()) -> ok.
put_woody_state(WoodyState, Req) ->
cowboy_req:cast({woody_state, WoodyState}, Req).
-spec set_event(woody_event_handler:event(), cowboy_req:req()) -> ok.
set_event(Event, Req) ->
cowboy_req:cast({set_event, Event}, Req).
%% callbacks
-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
-> {cowboy_stream:commands(), state()}.
init(StreamID, Req, Opts) ->
{Commands0, Next} = cowboy_stream:init(StreamID, Req, Opts),
{Commands0, #{next => Next, event_to_emit => ?EV_SERVER_RECEIVE}}.
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
-> {cowboy_stream:commands(), State} when State::state().
data(StreamID, IsFin, Data, #{next := Next0} = State) ->
{Commands0, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
{Commands0, State#{next => Next}}.
-spec info(cowboy_stream:streamid(), any(), State)
-> {cowboy_stream:commands(), State} when State::state().
info(StreamID, {woody_state, WoodyState} = Info, #{next := Next0} = State) ->
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
{Commands, State#{next => Next, woody_state => WoodyState}};
% Handler emited server receive, so monitor should emit service result
info(StreamID, {set_event, Event} = Info, #{next := Next0} = State) ->
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
{Commands, State#{next => Next, event_to_emit => Event}};
info(StreamID, Info, #{next := Next0} = State) ->
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
{Commands, State#{next => Next}}.
-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), state()) -> any().
terminate(
StreamID,
{socket_error, _, HumanReadable} = Reason,
#{woody_state := WoodyState, next := Next, event_to_emit:= Event}
) ->
woody_event_handler:handle_event(Event,
WoodyState,
#{status => error, reason => woody_util:to_binary(HumanReadable)}
),
cowboy_stream:terminate(StreamID, Reason, Next);
terminate(StreamID, Reason, #{next := Next}) ->
cowboy_stream:terminate(StreamID, Reason, Next).
-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
when Resp::cowboy_stream:resp_command().
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
% We can't really do anything about it
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).

View File

@ -151,7 +151,7 @@ get_cowboy_config(Opts = #{event_handler := EvHandler}) ->
ok = validate_event_handler(EvHandler),
Dispatch = get_dispatch(Opts),
ProtocolOpts = maps:get(protocol_opts, Opts, #{}),
CowboyOpts = maps:put(stream_handlers, [cowboy_stream_h, woody_trace_h], ProtocolOpts),
CowboyOpts = maps:put(stream_handlers, [woody_monitor_h, woody_trace_h, cowboy_stream_h], ProtocolOpts),
ReadBodyOpts = maps:get(read_body_opts, Opts, #{}),
maps:merge(#{
env =>#{dispatch => Dispatch, event_handler => EvHandler, read_body_opts => ReadBodyOpts},
@ -252,7 +252,6 @@ trace_resp(true, Req, Code, Headers, Body, EvHandler) ->
trace_resp(_, Req, _, _, _, _) ->
Req.
%%
%% cowboy_http_handler callbacks
%%
@ -264,7 +263,7 @@ init(Req, Opts = #{ev_handler := EvHandler, handler_limits := Limits}) ->
WoodyState = create_dummy_state(EvHandler),
case have_resources_to_continue(Limits) of
true ->
Opts1 = Opts#{url => Url, woody_state => WoodyState},
Opts1 = update_woody_state(Opts#{url => Url}, WoodyState, Req),
case check_request(Req, Opts1) of
{ok, Req1, State} -> handle(Req1, State);
{stop, Req1, State} -> {ok, Req1, State}
@ -386,16 +385,17 @@ check_woody_headers(Req, State = #{woody_state := WoodyState0}) ->
{Mode, Req0} = woody_util:get_req_headers_mode(Req),
case get_rpc_id(Req0, Mode) of
{ok, RpcId, Req1} ->
WoodyState1 = set_rpc_id(RpcId, WoodyState0),
WoodyState1 = set_cert(Req1, set_rpc_id(RpcId, WoodyState0)),
check_deadline_header(
cowboy_req:header(?HEADER_DEADLINE(Mode), Req1),
Req1,
Mode,
State#{woody_state => set_cert(Req1, WoodyState1)}
update_woody_state(State, WoodyState1, Req1)
);
{error, BadRpcId, Req1} ->
WoodyState1 = set_rpc_id(BadRpcId, WoodyState0),
reply_bad_header(400, woody_util:to_binary(["bad ", ?HEADER_PREFIX(Mode), " id header"]),
Req1, State#{woody_state => set_rpc_id(BadRpcId, WoodyState0)}
Req1, update_woody_state(State, WoodyState1, Req1)
)
end.
@ -443,20 +443,21 @@ check_deadline_header(DeadlineBin, Req, Mode, State) ->
check_deadline(Deadline, Req, Mode, State = #{url := Url, woody_state := WoodyState}) ->
case woody_deadline:is_reached(Deadline) of
true ->
woody_event_handler:handle_event(?EV_SERVER_RECEIVE, WoodyState,
_ = woody_event_handler:handle_event(?EV_SERVER_RECEIVE, WoodyState,
#{url => Url, status => error, reason => <<"Deadline reached">>}),
Req1 = handle_error({system, {internal, resource_unavailable, <<"deadline reached">>}}, Req, WoodyState),
{stop, Req1, undefined};
false ->
NewState = State#{woody_state => set_deadline(Deadline, WoodyState)},
WoodyState1 = set_deadline(Deadline, WoodyState),
Headers = cowboy_req:headers(Req),
check_metadata_headers(Headers, Req, Mode, NewState)
check_metadata_headers(Headers, Req, Mode, update_woody_state(State, WoodyState1, Req))
end.
-spec check_metadata_headers(woody:http_headers(), cowboy_req:req(), woody_util:headers_mode(), state()) ->
check_result().
check_metadata_headers(Headers, Req, Mode, State = #{woody_state := WoodyState, server_opts := ServerOpts}) ->
{ok, Req, State#{woody_state => set_metadata(find_metadata(Headers, Mode, ServerOpts), WoodyState)}}.
WoodyState1 = set_metadata(find_metadata(Headers, Mode, ServerOpts), WoodyState),
{ok, Req, update_woody_state(State, WoodyState1, Req)}.
-spec find_metadata(woody:http_headers(), woody_util:headers_mode(), server_opts()) ->
woody_context:meta().
@ -533,6 +534,7 @@ do_get_body(Body, Req, Opts) ->
-spec handle_request(woody:http_body(), woody:th_handler(), woody_state:st(), cowboy_req:req()) ->
cowboy_req:req().
handle_request(Body, ThriftHander, WoodyState, Req) ->
ok = woody_monitor_h:set_event(?EV_SERVICE_HANDLER_RESULT, Req),
case woody_server_thrift_handler:init_handler(Body, ThriftHander, WoodyState) of
{ok, oneway_void, HandlerState} ->
Req1 = reply(200, Req, WoodyState),
@ -597,3 +599,7 @@ reply_status(_) -> error.
log_event(Event, WoodyState, ExtraMeta) ->
woody_event_handler:handle_event(Event, WoodyState, ExtraMeta).
update_woody_state(State, WoodyState, Req) ->
ok = woody_monitor_h:put_woody_state(WoodyState, Req),
State#{woody_state => WoodyState}.

View File

@ -0,0 +1,87 @@
-module(server_timeout_event_handler).
-include("src/woody_defs.hrl").
-export([handle_event/4]).
-export([child_spec/0]).
-export([start_link/0]).
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([get_socket_errors_caught/0]).
-type state() :: #{
socket_errors_caught => pos_integer()
}.
-type event() :: woody_event_handler:event().
-type rpc_id() :: woody:rpc_id().
-type event_meta() :: woody_event_handler:event_meta().
-type options() :: woody:options().
%% API
-define(SOCKET_CLOSED, <<"The socket has been closed.">>).
-spec get_socket_errors_caught() -> pos_integer().
get_socket_errors_caught() ->
{ok, N} = gen_server:call(?MODULE, get_number_of_events),
N.
-spec child_spec() -> supervisor:child_spec().
child_spec() -> #{
id => ?MODULE,
start => {?MODULE, start_link, []},
type => worker
}.
%% woody_event_handler callbaacks
-spec handle_event(
event(),
rpc_id(),
event_meta(),
options()
) -> _.
handle_event(Event, RpcId, Meta, Opts) ->
gen_server:call(?MODULE, {Event, RpcId, Meta, Opts}).
%% gen_server callbacks
-spec start_link() -> {ok, pid()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec init(_) -> {ok, state()}.
init(_) ->
{ok, #{socket_errors_caught => 0}}.
-spec handle_call({event(), rpc_id(), event_meta(), options()}, _, state()) ->
{reply, ok | {ok, pos_integer()}, state()}.
handle_call({Event, Rpc, #{status := error, reason := ?SOCKET_CLOSED} = Meta, Opts}, _, #{
socket_errors_caught := Caught
} = State) when
Event =:= ?EV_SERVICE_HANDLER_RESULT orelse
Event =:= ?EV_SERVER_RECEIVE
->
woody_tests_SUITE:handle_event(Event, Rpc, Meta, Opts),
{reply, ok, State#{socket_errors_caught => Caught + 1}};
handle_call({Event, Rpc, Meta, Opts}, _, State) ->
woody_tests_SUITE:handle_event(Event, Rpc, Meta, Opts),
{reply, ok, State};
handle_call(get_number_of_events, _, #{socket_errors_caught := N} = State) ->
{reply, {ok, N}, State}.
-spec handle_cast(_, state()) -> {noreply, state()}.
handle_cast(_, S) ->
{noreply, S}.

View File

@ -40,6 +40,7 @@ service Weapons {
1: string name
2: binary data
) throws (1: WeaponFailure error)
void get_stuck_looping_weapons()
}
service Powerups {

View File

@ -72,6 +72,7 @@
call_thrift_multiplexed_test/1,
call_deadline_ok_test/1,
call_deadline_reached_on_client_test/1,
server_handled_client_timeout_test/1,
call_deadline_timeout_test/1,
server_http_req_validation_test/1,
try_bad_handler_spec_test/1,
@ -224,6 +225,7 @@
-spec call_thrift_multiplexed_test(config()) -> any().
-spec call_deadline_ok_test(config()) -> any().
-spec call_deadline_reached_on_client_test(config()) -> any().
-spec server_handled_client_timeout_test(config()) -> any().
-spec call_deadline_timeout_test(config()) -> any().
-spec server_http_req_validation_test(config()) -> any().
-spec try_bad_handler_spec_test(config()) -> any().
@ -285,6 +287,7 @@ groups() ->
call_thrift_multiplexed_test,
call_deadline_ok_test,
call_deadline_reached_on_client_test,
server_handled_client_timeout_test,
call_deadline_timeout_test,
server_http_req_validation_test,
try_bad_handler_spec_test,
@ -373,6 +376,12 @@ init_per_testcase(calls_with_cache, C) ->
{ok, _} = start_caching_client(caching_client_ct, Sup),
{ok, _} = start_woody_server(woody_ct, Sup, ['Weapons', 'Powerups']),
[{sup, Sup} | C];
init_per_testcase(server_handled_client_timeout_test, C) ->
{ok, Sup} = start_tc_sup(),
{ok, _} = supervisor:start_child(Sup, server_timeout_event_handler:child_spec()),
{ok, _} = start_woody_server(woody_ct, Sup, ['Weapons', 'Powerups'], server_timeout_event_handler),
[{sup, Sup} | C];
init_per_testcase(_, C) ->
{ok, Sup} = start_tc_sup(),
{ok, _} = start_woody_server(woody_ct, Sup, ['Weapons', 'Powerups']),
@ -431,9 +440,12 @@ start_error_server(TC, Sup) ->
supervisor:start_child(Sup, Server).
start_woody_server(Id, Sup, Services) ->
start_woody_server(Id, Sup, Services, ?MODULE).
start_woody_server(Id, Sup, Services, EventHandler) ->
Server = woody_server:child_spec(Id, #{
handlers => [get_handler(S) || S <- Services],
event_handler => ?MODULE,
event_handler => EventHandler,
ip => ?SERVER_IP,
port => ?SERVER_PORT
}),
@ -828,6 +840,23 @@ call_deadline_reached_on_client_test(_) ->
woody_client:call(Request, Opts, Context)
).
server_handled_client_timeout_test(_) ->
Id = <<"server_handled_client_timeout">>,
{Url, Service} = get_service_endpoint('Weapons'),
Request = {Service, get_stuck_looping_weapons, []},
Opts = #{url => Url, event_handler => ?MODULE},
Deadline = woody_deadline:from_timeout(250),
Context = woody_context:new(Id, #{}, Deadline),
try
case woody_client:call(Request, Opts, Context) of
_ -> error(unexpected_result)
end
catch
error:{woody_error, {external, result_unknown, <<"timeout">>}} ->
1 = server_timeout_event_handler:get_socket_errors_caught(),
ok
end.
call_deadline_timeout_test(_) ->
Id = <<"call_deadline_timeout">>,
{Url, Service} = get_service_endpoint('Weapons'),
@ -1012,7 +1041,10 @@ handle_function(ProxyGetPowerup, [Name, To], Context, _Opts) when
handle_function(like_powerup, [Name, To], Context, _Opts) ->
ok = send_msg(To, {woody_context:get_rpc_id(parent_id, Context), Name}),
{ok, ok}.
{ok, ok};
handle_function(get_stuck_looping_weapons, _, _, _) ->
{ok, timer:sleep(infinity)}.
%%
%% woody_event_handler callbacks