Add clean types for rpc events

This commit is contained in:
Anton Belyaev 2016-04-17 04:42:38 +03:00
parent ccd614e93b
commit a833e7a6cf
4 changed files with 169 additions and 51 deletions

View File

@ -3,19 +3,146 @@
%% API
-export([handle_event/3]).
-define(client_send , 'client send request').
-define(client_receive , 'client receive response').
-define(server_receive , 'server receive request').
-define(server_send , 'server send response').
-define(call_service , 'call service').
-define(get_result , 'get service result').
-define(thrift_error , 'thrift error').
-define(internal_error , 'internal error').
%%
%% behaviour definition
%%
-type event_type() :: atom() | {atom(), any()}.
-type meta() :: #{atom() => term()}.
-export_type([event_type/0, meta/0]).
-export_type([event_type/0, meta_client_send/0, meta_client_receive/0,
meta_server_receive/0, meta_server_send/0, meta_call_service/0,
meta_service_result/0, meta_thrift_error/0, meta_internal_error/0
]).
-callback handle_event(event_type(), meta()) -> _.
-callback handle_event
%% mandatory
(?client_send , meta_client_send ()) -> _;
(?client_receive , meta_client_receive ()) -> _;
(?server_receive , meta_server_receive ()) -> _;
(?server_send , meta_server_send ()) -> _;
(?call_service , meta_call_service ()) -> _;
(?get_result , meta_service_result ()) -> _;
%% optional
(?thrift_error , meta_thrift_error ()) -> _;
(?internal_error , meta_internal_error ()) -> _.
-type event_type() :: ?client_send | ?client_receive | ?server_receive |
?server_send | ?call_service | ?get_result | ?thrift_error |
?internal_error.
-type status() :: ok | error.
-type thrift_stage() :: protocol_read | protocol_write.
-type meta_client_send() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
url => rpc_t:url(),
function => rpc_t:func(),
%% optional
service => rpc_t:handler(),
args => rpc_client:args()
}.
-type meta_client_receive() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
status => status(),
%% optional
result => any()
}.
-type meta_server_receive() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
url => rpc_t:url(),
status => status(),
%% optional
reason => any()
}.
-type meta_server_send() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
status => status(),
%% optional
code => pos_integer()
}.
-type meta_call_service() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
function => rpc_t:func(),
%% optional
args => rpc_thrift_handler:args(),
options => rpc_thrift_handler:handler_opts()
}.
-type meta_service_result() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
status => status(),
%% optional
result => any(),
class => throw | error | exit,
reason => any(),
stack => any(),
ignore => boolean()
}.
-type meta_thrift_error() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
stage => thrift_stage(),
reason => any()
}.
-type meta_internal_error() :: #{
%% mandatory
span_id => rpc_t:rpc_id(),
parent_id => rpc_t:rpc_id(),
trace_id => rpc_t:rpc_id(),
error => any(),
reason => any(),
%% optional
stack => any()
}.
%%
%% API
%%
-spec handle_event(rpc_t:handler(), event_type(), meta()) -> _.
-spec handle_event
%% mandatory
(rpc_t:handler() , ?client_send , meta_client_send ()) -> _;
(rpc_t:handler() , ?client_receive , meta_client_receive ()) -> _;
(rpc_t:handler() , ?server_receive , meta_server_receive ()) -> _;
(rpc_t:handler() , ?server_send , meta_server_send ()) -> _;
(rpc_t:handler() , ?call_service , meta_call_service ()) -> _;
(rpc_t:handler() , ?get_result , meta_service_result ()) -> _;
%% optional
(rpc_t:handler() , ?thrift_error , meta_thrift_error ()) -> _;
(rpc_t:handler() , ?internal_error , meta_internal_error ()) -> _.
handle_event(Handler, Type, Meta) ->
Handler:handle_event(Type, Meta).

View File

@ -34,18 +34,14 @@ call(Client = #{event_handler := EventHandler},
{Service, Function, Args}, TransportOpts = #{url := Url})
->
RpcId = maps:with([span_id, trace_id, parent_id], Client),
rpc_event_handler:handle_event(EventHandler, send_request, RpcId#{
rpc_role => client,
direction => request,
rpc_event_handler:handle_event(EventHandler, 'client send request', RpcId#{
url => Url,
service => Service,
function => Function,
args => Args
}),
Result = do_call(make_thrift_client(RpcId, Service, TransportOpts), Function, Args),
rpc_event_handler:handle_event(EventHandler, receive_response, RpcId#{
rpc_role => client,
direction => response,
rpc_event_handler:handle_event(EventHandler, 'client receive response', RpcId#{
rpc_result => Result
}),
format_return(Result, Client).

View File

@ -23,17 +23,13 @@
%%
%% API
%%
-define(log_rpc_result(EventHandler, Event, Status, Dir, Meta),
rpc_event_handler:handle_event(EventHandler, Event, Meta#{
rpc_role => server, direction => Dir, status => Status
})
-define(log_rpc_result(EventHandler, Status, Meta),
rpc_event_handler:handle_event(EventHandler, 'get service result',
Meta#{status => Status})
).
-define(stage_read , thrift_protocol_read).
-define(stage_write , thrift_protocol_write).
-define(event_handle_func_res , handle_function_result).
-define(event_handle_error_res , handle_error_result).
-define(stage_read , protocol_read).
-define(stage_write , protocol_write).
-define(error_unknown_function , no_function).
-define(error_multiplexed_req , multiplexed_request).
@ -157,12 +153,11 @@ call_handler(Function,Args, #state{
handler_opts = Opts,
event_handler = EventHandler})
->
rpc_event_handler:handle_event(EventHandler, invoke_handle_function, RpcId#{
rpc_role => server, direction => request,
rpc_event_handler:handle_event(EventHandler, 'call service', RpcId#{
function => Function, args => Args, options => Opts
}),
Result = Handler:handle_function(Function, Args, RpcId, RpcClient, Opts),
?log_rpc_result(EventHandler, ?event_handle_func_res, ok, response, RpcId#{result => Result}),
?log_rpc_result(EventHandler, ok, RpcId#{result => Result}),
Result.
handle_result(ok, State, Function, SeqId) ->
@ -198,16 +193,16 @@ handle_function_catch(State = #state{
ReplyType = Service:function_info(Function, reply_type),
case {Class, Reason} of
_Error when ReplyType =:= oneway_void ->
?log_rpc_result(EventHandler, ?event_handle_func_res, error, response,
?log_rpc_result(EventHandler, error,
RpcId#{class => Class, reason => Reason, ignore => true}),
{State, noreply};
{throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
?log_rpc_result(EventHandler, ?event_handle_func_res, error, response,
?log_rpc_result(EventHandler, error,
RpcId#{class => throw, reason => Exception, ignore => false}),
handle_exception(State, Function, Exception, SeqId);
{error, Reason} ->
?log_rpc_result(EventHandler, ?event_handle_func_res, error, response,
RpcId#{class => error, reason => Reason, stacktrace => Stack, ignore => false}),
?log_rpc_result(EventHandler, error, RpcId#{class => error,
reason => Reason, stack => Stack, ignore => false}),
Reason1 = if is_tuple(Reason) -> element(1, Reason); true -> Reason end,
handle_error(State, Function, Reason1, SeqId)
end.
@ -283,12 +278,10 @@ handle_protocol_error(State = #state{
event_handler = EventHandler}, Function, Reason)
->
call_error_handler(State, Function, Reason),
?log_rpc_result(EventHandler, Stage, error, get_direction(Stage), RpcId#{reason => Reason}),
rpc_event_handler:handle_event(EventHandler, 'thrift error',
RpcId#{stage => Stage, reason => Reason}),
format_protocol_error(Reason, Trans).
get_direction(?stage_read) -> request;
get_direction(?stage_write) -> response.
call_error_handler(#state{
rpc_id = RpcId,
rpc_client = RpcClient,
@ -296,12 +289,15 @@ call_error_handler(#state{
handler_opts = Opts,
event_handler = EventHandler}, Function, Reason) ->
try
Handler:handle_error(Function, Reason, RpcId, RpcClient, Opts),
?log_rpc_result(EventHandler, ?event_handle_error_res, ok, response, RpcId)
Handler:handle_error(Function, Reason, RpcId, RpcClient, Opts)
catch
Class:Error ->
?log_rpc_result(EventHandler, ?event_handle_error_res, error, response,
RpcId#{class => Class, reason => Error})
rpc_event_handler:handle_event(EventHandler, 'internal error', RpcId#{
error => <<"service error handler failed">>,
class => Class,
reason => Error,
stack => erlang:get_stacktrace()
})
end.
format_protocol_error({bad_binary_protocol_version, _Version}, Trans) ->

View File

@ -42,16 +42,14 @@
-define(THRIFT_ERROR_KEY, {?MODULE, thrift_error}).
-define(log_event(EventHandler, Event, Status, Dir, Meta),
-define(log_event(EventHandler, Event, Status, Meta),
rpc_event_handler:handle_event(EventHandler, Event, Meta#{
rpc_role => server,
direction => Dir,
status => Status
status => Status
})
).
-define(event_send_reply, send_response).
-define(event_rpc_receive, receive_request).
-define(event_send_reply , 'server send response').
-define(event_rpc_receive , 'server receive request').
-define(HEADERS_RPC_ID, #{
span_id => ?HEADER_NAME_RPC_ID,
@ -160,7 +158,7 @@ flush(State = #http_req{
}) ->
{Code, Req1} = add_x_error_header(Req),
?log_event(EventHandler, ?event_send_reply, ok,
response, RpcId#{code => Code}),
RpcId#{code => Code}),
{ok, Req2} = cowboy_req:reply(Code, [{<<"content-type">>, ?CONTENT_TYPE_THRIFT}],
Body, Req1),
{State#http_req{req = Req2, resp_body = <<>>, replied = true}, ok}.
@ -188,7 +186,7 @@ init({_Transport, http}, Req, Opts = [EventHandler|_]) ->
EventHandler, RpcId, Url, Opts);
{error, ErrorMeta, Req2} ->
?log_event(EventHandler, ?event_rpc_receive, error,
request, ErrorMeta#{url => Url, reason => no_rpc_id}),
ErrorMeta#{url => Url, reason => bad_rpc_id}),
reply_error_early(403, Req2)
end.
@ -197,16 +195,16 @@ init({_Transport, http}, Req, Opts = [EventHandler|_]) ->
handle(Req, [Url, RpcId, ServerOpts, EventHandler, ThriftHandler]) ->
case get_body(Req, ServerOpts) of
{ok, Body, Req1} when byte_size(Body) > 0 ->
?log_event(EventHandler, ?event_rpc_receive, ok, request, RpcId#{url => Url}),
?log_event(EventHandler, ?event_rpc_receive, ok, RpcId#{url => Url}),
do_handle(RpcId, Body, ThriftHandler, EventHandler, Req1);
{ok, <<>>, Req1} ->
reply_error(411, RpcId, EventHandler, Req1);
{error, body_too_large, Req1} ->
?log_event(EventHandler, ?event_rpc_receive, error, request,
?log_event(EventHandler, ?event_rpc_receive, error,
RpcId#{url => Url, reason => body_too_large}),
reply_error(413, RpcId, EventHandler, Req1);
{error, Reason, Req1} ->
?log_event(EventHandler, ?event_rpc_receive, error, request,
?log_event(EventHandler, ?event_rpc_receive, error,
RpcId#{url => Url, reason => {body_read_error, Reason}}),
reply_error(400, RpcId, EventHandler, Req1)
end.
@ -216,8 +214,9 @@ handle(Req, [Url, RpcId, ServerOpts, EventHandler, ThriftHandler]) ->
terminate({normal, _}, _Req, _Status) ->
ok;
terminate(Reason, _Req, [_, RpcId, _, EventHandler | _]) ->
?log_event(EventHandler, http_handler_terminate, error, undefined,
RpcId#{reason => Reason}),
rpc_event_handler:handle_event(EventHandler, 'internal error', RpcId#{
error => <<"http handler terminated abnormally">>, reason => Reason
}),
ok.
get_rpc_id(Req) ->
@ -244,11 +243,11 @@ check_headers(Req, EventHandler, RpcId, Url, Opts) ->
{ok, Req3} ->
{ok, Req3, [Url, RpcId | Opts]};
{error, {wrong_method, Method}, Req3} ->
?log_event(EventHandler, ?event_rpc_receive, error, request,
?log_event(EventHandler, ?event_rpc_receive, error,
RpcId#{url => Url, reason => {wrong_method, Method}}),
reply_error_early(405, Req3);
{error, {wrong_content_type, BadType}, Req3} ->
?log_event(EventHandler, ?event_rpc_receive, error, request,
?log_event(EventHandler, ?event_rpc_receive, error,
RpcId#{url => Url, reason => {wrong_content_type, BadType}}),
reply_error_early(403, Req3)
end.
@ -301,7 +300,7 @@ handle_error(_Error, RpcId, EventHandler, Req) ->
reply_error(500, RpcId, EventHandler, Req).
reply_error(Code, RpcId, EventHandler, Req) when is_integer(Code), Code >= 400 ->
?log_event(EventHandler, ?event_send_reply, error, response, RpcId#{code => Code}),
?log_event(EventHandler, ?event_send_reply, error, RpcId#{code => Code}),
{_, Req1} = add_x_error_header(Req),
{ok, Req2} = cowboy_req:reply(Code, Req1),
{ok, Req2, undefined}.