Refactor event_hanlder invocaions

This commit is contained in:
Anton Belyaev 2016-04-14 22:14:26 +03:00
parent d4c5f5edbc
commit 9fc64e8d92
4 changed files with 70 additions and 72 deletions

View File

@ -7,7 +7,7 @@
%% behaviour definition
%%
-type event_type() :: atom() | {atom(), any()}.
-type meta() :: [{atom(), term()}].
-type meta() :: #{atom() => term()}.
-export_type([event_type/0, meta/0]).
-callback handle_event(event_type(), meta()) -> _.

View File

@ -38,21 +38,21 @@ call(Client = #{
},
{Service, Function, Args}, TransportOpts = #{url := Url})
->
rpc_event_handler:handle_event(EventHandler, rpc_send_request, [
{rpc_role, client},
{req_id, ReqId},
{parent_request_id, PaReqId},
{url, Url},
{service, Service},
{function, Function},
{args, Args}
]),
rpc_event_handler:handle_event(EventHandler, rpc_send_request, #{
rpc_role => client,
req_id => ReqId,
parent_request_id => PaReqId,
url => Url,
service => Service,
function => Function,
args => Args
}),
Result = do_call(make_thrift_client(IsRoot, PaReqId, ReqId, Service, TransportOpts), Function, Args),
rpc_event_handler:handle_event(EventHandler, rpc_result_received, [
{rpc_role, client},
{req_id, ReqId},
{rpc_result, Result}
]),
rpc_event_handler:handle_event(EventHandler, rpc_result_received, #{
rpc_role => client,
req_id => ReqId,
rpc_result => Result
}),
format_return(Result, Client).
make_thrift_client(IsRoot, PaReqId, ReqId, Service, TransportOpts) ->

View File

@ -27,17 +27,21 @@
%% API
%%
-define(log_rpc_result(EventHandler, Event, ReqId, Status, Meta),
rpc_event_handler:handle_event(EventHandler, Event, [
{rpc_role, server},
{req_id, ReqId},
{status, Status}
] ++ Meta)
rpc_event_handler:handle_event(EventHandler, Event, Meta#{
rpc_role => server, req_id => ReqId, status => Status
})
).
-define(error_unknown_function, no_function).
-define(error_multiplexed_req, multiplexed_request).
-define(error_protocol_send, send_error).
-define(stage_read, thrift_protocol_read).
-define(stage_write, thrift_protocol_write).
-define(event_handle_func_res, rpc_handle_function_result).
-define(event_handle_error_res, rpc_handle_error_result).
-record(state, {
req_id :: rpc_t:req_id(),
rpc_client :: rpc_client:client(),
@ -45,7 +49,7 @@
handler :: rpc_t:handler(),
handler_opts :: handler_opts(),
protocol :: any(),
protocol_stage :: read_request | write_response,
protocol_stage :: ?stage_read | ?stage_write,
event_handler :: rpc_t:handler(),
transport_handler :: rpc_t:handler()
}).
@ -71,7 +75,7 @@ start(Transport, ReqId, RpcClient, {Service, Handler, Opts}, EventHandler, Trans
handler = Handler,
handler_opts = Opts,
protocol = Protocol,
protocol_stage = read_request,
protocol_stage = ?stage_read,
event_handler = EventHandler,
transport_handler = TransportHandler
}),
@ -136,7 +140,7 @@ handle_function(Function, InParams, State = #state{protocol = Protocol}, SeqId)
case ReadResult of
{ok, Args} ->
try_call_handler(Function, Args,
State1#state{protocol_stage = write_response}, SeqId);
State1#state{protocol_stage = ?stage_write}, SeqId);
Error = {error, _} ->
{State1, Error}
end.
@ -156,14 +160,12 @@ call_handler(Function,Args, #state{
handler_opts = Opts,
event_handler = EventHandler})
->
rpc_event_handler:handle_event(EventHandler, calling_rpc_handler, [
{req_id, ReqId},
{function, Function},
{args, Args},
{options, Opts}
]),
rpc_event_handler:handle_event(EventHandler, calling_rpc_handler, #{
rpc_role => server, req_id => ReqId,
function => Function, args => Args, options => Opts
}),
Result = Handler:handle_function(Function, RpcClient, Args, Opts),
?log_rpc_result(EventHandler, rpc_handler_result, ReqId, ok, [{result, Result}]),
?log_rpc_result(EventHandler, ?event_handle_func_res, ReqId, ok, #{result => Result}),
Result.
handle_result(ok, State, Function, SeqId) ->
@ -199,26 +201,16 @@ handle_function_catch(State = #state{
ReplyType = Service:function_info(Function, reply_type),
case {Class, Reason} of
_Error when ReplyType =:= oneway_void ->
?log_rpc_result(EventHandler, rpc_handler_exception, ReqId, error, [
{exception_type, Class},
{reason, Reason},
{ignore, true}
]),
?log_rpc_result(EventHandler, ?event_handle_func_res, ReqId, error,
#{class => Class, reason => Reason, ignore => true}),
{State, noreply};
{throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
?log_rpc_result(EventHandler, rpc_handler_exception, ReqId, error, [
{exception_type, throw},
{reason, Reason},
{ignore, false}
]),
?log_rpc_result(EventHandler, ?event_handle_func_res, ReqId, error,
#{class => throw, reason => Reason, ignore => false}),
handle_exception(State, Function, Exception, SeqId);
{error, Reason} ->
?log_rpc_result(EventHandler, rpc_handler_exception, ReqId, error, [
{exception_type, error},
{reason, Reason},
{stacktrace, Stack},
{ignore, false}
]),
?log_rpc_result(EventHandler, ?event_handle_func_res, ReqId, error,
#{class => error, reason => Reason, stacktrace => Stack, ignore => false}),
Reason1 = if is_tuple(Reason) -> element(1, Reason); true -> Reason end,
handle_error(State, Function, Reason1, SeqId)
end.
@ -287,7 +279,7 @@ handle_protocol_error(State = #state{
event_handler = EventHandler}, Function, Reason)
->
call_error_handler(State, Function, Reason),
?log_rpc_result(EventHandler, Stage, ReqId, error, [{reason, Reason}]),
?log_rpc_result(EventHandler, Stage, ReqId, error, #{reason => Reason}),
format_protocol_error(Reason, Trans).
call_error_handler(#state{
@ -296,10 +288,13 @@ call_error_handler(#state{
handler = Handler,
handler_opts = Opts,
event_handler = EventHandler}, Function, Reason) ->
try Handler:handle_error(Function, RpcClient, Reason, Opts)
try
Handler:handle_error(Function, RpcClient, Reason, Opts),
?log_rpc_result(EventHandler, ?event_handle_error_res, ReqId, ok, #{})
catch
Class:Error ->
?log_rpc_result(EventHandler, handle_error, ReqId, Class, [{reason, Error}])
?log_rpc_result(EventHandler, ?event_handle_error_res, ReqId, error,
#{class => Class, reason => Error})
end.
format_protocol_error({bad_binary_protocol_version, _Version}, Trans) ->

View File

@ -43,13 +43,16 @@
-define(THRIFT_ERROR_KEY, {?MODULE, thrift_error}).
-define(log_event(EventHandler, Event, ReqId, Status, Meta),
rpc_event_handler:handle_event(EventHandler, Event, [
{rpc_role, server},
{req_id, ReqId},
{status, Status}
] ++ Meta)
rpc_event_handler:handle_event(EventHandler, Event, Meta#{
rpc_role => server,
req_id => ReqId,
status => Status
})
).
-define(event_send_reply, send_reply).
-define(event_rpc_receive, rpc_receive).
%%
%% rpc_server callback
%%
@ -146,7 +149,7 @@ flush(State = #http_req{
event_handler = EventHandler
}) ->
{Code, Req1} = add_x_error_header(Req),
?log_event(EventHandler, send_reply, ReqId, Code,[]),
?log_event(EventHandler, ?event_send_reply, ReqId, reply_status(Code), #{code => Code}),
{ok, Req2} = cowboy_req:reply(
Code,
[],
@ -155,6 +158,9 @@ flush(State = #http_req{
),
{State#http_req{req = Req2, resp_body = <<>>}, ok}.
reply_status(200) -> ok;
reply_status(_) -> error.
-spec close(state()) -> {state(), ok}.
close(_State) ->
{#http_req{}, ok}.
@ -176,10 +182,8 @@ init({_Transport, http}, Req, Opts = [EventHandler|_]) ->
check_headers(set_resp_headers(ReqId, Req2),
EventHandler, ReqId, Url, Opts);
{error, Req2} ->
?log_event(EventHandler, rpc_received, undefined, error, [
{url, Url},
{reason, no_rpc_id}
]),
?log_event(EventHandler, ?event_rpc_receive, undefined, error,
#{url => Url, reason => no_rpc_id}),
reply_error_early(403, Req2)
end.
@ -188,17 +192,17 @@ init({_Transport, http}, Req, Opts = [EventHandler|_]) ->
handle(Req, [Url, ReqId, ServerOpts, EventHandler, ThriftHandler]) ->
case get_body(Req, ServerOpts) of
{ok, Body, Req1} when byte_size(Body) > 0 ->
?log_event(EventHandler, rpc_received, ReqId, ok, [{url, Url}]),
?log_event(EventHandler, ?event_rpc_receive, ReqId, ok, #{url => Url}),
do_handle(ReqId, Body, ThriftHandler, EventHandler, Req1);
{ok, <<>>, Req1} ->
reply_error(411, ReqId, EventHandler, Req1);
{error, body_too_large, Req1} ->
?log_event(EventHandler, rpc_received, ReqId, error, [
{url, Url}, {reason, body_too_large}]),
?log_event(EventHandler, ?event_rpc_receive, ReqId, error,
#{url => Url, reason => body_too_large}),
reply_error(413, ReqId, EventHandler, Req1);
{error, Reason, Req1} ->
?log_event(EventHandler, rpc_received, ReqId, error, [
{url, Url}, {reason, {body_read_error, Reason}}]),
?log_event(EventHandler, ?event_rpc_receive, ReqId, error,
#{url => Url, reason => {body_read_error, Reason}}),
reply_error(400, ReqId, EventHandler, Req1)
end.
@ -207,9 +211,8 @@ handle(Req, [Url, ReqId, ServerOpts, EventHandler, ThriftHandler]) ->
terminate({normal, _}, _Req, _Status) ->
ok;
terminate(Reason, _Req, [_, ReqId, _, EventHandler | _]) ->
?log_event(EventHandler, http_handler_terminated_abnormally, ReqId, error, [
{reason, Reason}
]),
?log_event(EventHandler, http_handler_terminate, ReqId, error,
#{reason => Reason}),
ok.
check_headers(Req, EventHandler, ReqId, Url, Opts) ->
@ -217,12 +220,12 @@ check_headers(Req, EventHandler, ReqId, Url, Opts) ->
{ok, Req3} ->
{ok, Req3, [Url, ReqId | Opts]};
{error, {wrong_method, Method}, Req3} ->
?log_event(EventHandler, rpc_received, ReqId, error, [
{url, Url}, {reason, {wrong_method, Method}}]),
?log_event(EventHandler, ?event_rpc_receive, ReqId, error,
#{url => Url, reason => {wrong_method, Method}}),
reply_error_early(405, Req3);
{error, {wrong_content_type, BadType}, Req3} ->
?log_event(EventHandler, rpc_received, ReqId, error, [
{url, Url}, {reason, {wrong_content_type, BadType}}]),
?log_event(EventHandler, ?event_rpc_receive, ReqId, error,
#{url => Url, reason => {wrong_content_type, BadType}}),
reply_error_early(403, Req3)
end.
@ -281,7 +284,7 @@ handle_error(_Error, ReqId, EventHandler, Req) ->
reply_error(500, ReqId, EventHandler, Req).
reply_error(Code, ReqId, EventHandler, Req) when is_integer(Code), Code >= 400 ->
?log_event(EventHandler, reply_error, ReqId, Code,[]),
?log_event(EventHandler, ?event_send_reply, ReqId, error, #{code => Code}),
{_, Req1} = add_x_error_header(Req),
{ok, Req2} = cowboy_req:reply(Code, Req1),
{ok, Req2, undefined}.