Rename request IDs and http headers in accordance with Dapper terms

This commit is contained in:
Anton Belyaev 2016-04-16 22:11:54 +03:00
parent 27c11e7d80
commit 706520e04e
7 changed files with 68 additions and 66 deletions

View File

@ -16,6 +16,8 @@
-export_type([client/0, options/0, result_ok/0, result_error/0]). -export_type([client/0, options/0, result_ok/0, result_error/0]).
-define(ROOT_REQ_PARENT_ID, <<"undefined">>).
%% Internal API %% Internal API
-export([init_call_async/4, do_call_async/4]). -export([init_call_async/4, do_call_async/4]).
@ -31,14 +33,14 @@
%% %%
-type client() :: #{ %% all elements are mandatory -type client() :: #{ %% all elements are mandatory
root_rpc => boolean(), root_rpc => boolean(),
req_id => rpc_t:req_id() | undefined, span_id => rpc_t:req_id() | undefined,
root_req_id => rpc_t:req_id(), trace_id => rpc_t:req_id(),
parent_req_id => rpc_t:req_id(), parent_id => rpc_t:req_id(),
event_handler => rpc_t:handler(), event_handler => rpc_t:handler(),
seq => non_neg_integer() seq => non_neg_integer()
}. }.
-type result_ok() :: {ok, _Reply, client()} | {ok, client()}. -type result_ok() :: {ok, _Response, client()} | {ok, client()}.
-type result_error() :: {error, rpc_failed, client()} | {throw, _Exception, client()}. -type result_error() :: {error, rpc_failed, client()} | {throw, _Exception, client()}.
-type request() :: any(). -type request() :: any().
@ -55,20 +57,20 @@
new(ReqId, EventHandler) -> new(ReqId, EventHandler) ->
#{ #{
root_rpc => true, root_rpc => true,
req_id => ReqId, span_id => ReqId,
root_req_id => ReqId, trace_id => ReqId,
parent_req_id => <<"undefined">>, parent_id => ?ROOT_REQ_PARENT_ID,
seq => 0, seq => 0,
event_handler => EventHandler event_handler => EventHandler
}. }.
-spec make_child_client(rpc_t:rpc_id(), rpc_t:handler()) -> client(). -spec make_child_client(rpc_t:rpc_id(), rpc_t:handler()) -> client().
make_child_client(#{req_id := ReqId, root_req_id := RootReqId}, EventHandler) -> make_child_client(#{span_id := ReqId, trace_id := TraceId}, EventHandler) ->
#{ #{
root_rpc => false, root_rpc => false,
req_id => undefined, span_id => undefined,
root_req_id => RootReqId, trace_id => TraceId,
parent_req_id => ReqId, parent_id => ReqId,
seq => 0, seq => 0,
event_handler => EventHandler event_handler => EventHandler
}. }.
@ -78,7 +80,7 @@ next(Client = #{root_rpc := true}) ->
Client; Client;
next(Client = #{root_rpc := false, seq := Seq}) -> next(Client = #{root_rpc := false, seq := Seq}) ->
NextSeq = Seq +1, NextSeq = Seq +1,
Client#{req_id => make_req_id(NextSeq), seq => NextSeq}. Client#{span_id => make_req_id(NextSeq), seq => NextSeq}.
-spec call(client(), request(), options()) -> result_ok() | no_return(). -spec call(client(), request(), options()) -> result_ok() | no_return().
call(Client, Request, Options) -> call(Client, Request, Options) ->

View File

@ -8,9 +8,9 @@
-type req_id() :: binary(). -type req_id() :: binary().
-type rpc_id() :: #{ -type rpc_id() :: #{
req_id => rpc_id(), span_id => rpc_id(),
root_req_id => rpc_id(), trace_id => rpc_id(),
parent_req_id => rpc_id() parent_id => rpc_id()
}. }.
-type options() :: map(). -type options() :: map().

View File

@ -33,7 +33,7 @@ stop_pool(Name) ->
call(Client = #{event_handler := EventHandler}, call(Client = #{event_handler := EventHandler},
{Service, Function, Args}, TransportOpts = #{url := Url}) {Service, Function, Args}, TransportOpts = #{url := Url})
-> ->
RpcId = maps:with([req_id, root_req_id, parent_req_id], Client), RpcId = maps:with([span_id, trace_id, parent_id], Client),
rpc_event_handler:handle_event(EventHandler, send_request, RpcId#{ rpc_event_handler:handle_event(EventHandler, send_request, RpcId#{
rpc_role => client, rpc_role => client,
direction => request, direction => request,
@ -42,7 +42,7 @@ call(Client = #{event_handler := EventHandler},
function => Function, function => Function,
args => Args args => Args
}), }),
Result = do_call(make_thrift_client( RpcId, Service, TransportOpts), Function, Args), Result = do_call(make_thrift_client(RpcId, Service, TransportOpts), Function, Args),
rpc_event_handler:handle_event(EventHandler, receive_response, RpcId#{ rpc_event_handler:handle_event(EventHandler, receive_response, RpcId#{
rpc_role => client, rpc_role => client,
direction => response, direction => response,

View File

@ -54,9 +54,9 @@
-define(event_rpc_receive, receive_request). -define(event_rpc_receive, receive_request).
-define(HEADERS_RPC_ID, #{ -define(HEADERS_RPC_ID, #{
req_id => ?HEADER_NAME_RPC_ID, span_id => ?HEADER_NAME_RPC_ID,
root_req_id => ?HEADER_NAME_RPC_ROOT_ID, trace_id => ?HEADER_NAME_RPC_ROOT_ID,
parent_req_id => ?HEADER_NAME_RPC_PARENT_ID parent_id => ?HEADER_NAME_RPC_PARENT_ID
}). }).

View File

@ -3,10 +3,10 @@
-define(CONTENT_TYPE_THRIFT , <<"application/x-thrift">>). -define(CONTENT_TYPE_THRIFT , <<"application/x-thrift">>).
-define(HEADER_NAME_RPC_ID , <<"x-rbk-rpc-id">>). -define(HEADER_NAME_RPC_ID , <<"x-rbk-span-id">>).
-define(HEADER_NAME_RPC_PARENT_ID , <<"x-rbk-rpc-id-parent">>). -define(HEADER_NAME_RPC_PARENT_ID , <<"x-rbk-parent-id">>).
-define(HEADER_NAME_RPC_ROOT_ID , <<"x-rbk-rpc-id-root">>). -define(HEADER_NAME_RPC_ROOT_ID , <<"x-rbk-root-id">>).
-define(HEADER_NAME_ERROR_TRANSPORT , <<"x-rbk-rpc-error-transport">>). -define(HEADER_NAME_ERROR_TRANSPORT , <<"x-rbk-rpc-error-thrift">>).
-define(HEADER_NAME_ERROR_LOGIC , <<"x-rbk-rpc-error-logic">>). -define(HEADER_NAME_ERROR_LOGIC , <<"x-rbk-rpc-error-logic">>).
-endif. -endif.

View File

@ -18,13 +18,13 @@
-export_type([url/0]). -export_type([url/0]).
-type rpc_transport() :: #{ -type rpc_transport() :: #{
req_id => rpc_t:req_id(), span_id => rpc_t:req_id(),
root_req_id => rpc_t:req_id(), trace_id => rpc_t:req_id(),
parent_req_id => rpc_t:req_id(), parent_id => rpc_t:req_id(),
url => url(), url => url(),
options => map(), options => map(),
write_buffer => binary(), write_buffer => binary(),
read_buffer => binary() read_buffer => binary()
}. }.
@ -73,13 +73,13 @@ read(Transport = #{read_buffer := RBuffer}, Len) when
-spec flush(rpc_transport()) -> {rpc_transport(), ok | {error, _Reason}}. -spec flush(rpc_transport()) -> {rpc_transport(), ok | {error, _Reason}}.
flush(Transport = #{ flush(Transport = #{
url := Url, url := Url,
req_id := ReqId, span_id := SpanId,
root_req_id := RootReqId, trace_id := TraceId,
parent_req_id := PaReqId, parent_id := ParentId,
options := Options, options := Options,
write_buffer := WBuffer, write_buffer := WBuffer,
read_buffer := RBuffer read_buffer := RBuffer
}) when }) when
is_binary(WBuffer), is_binary(WBuffer),
is_binary(RBuffer) is_binary(RBuffer)
@ -87,9 +87,9 @@ flush(Transport = #{
Headers = [ Headers = [
{<<"content-type">> , ?CONTENT_TYPE_THRIFT}, {<<"content-type">> , ?CONTENT_TYPE_THRIFT},
{<<"accept">> , ?CONTENT_TYPE_THRIFT}, {<<"accept">> , ?CONTENT_TYPE_THRIFT},
{?HEADER_NAME_RPC_ROOT_ID , genlib:to_binary(RootReqId)}, {?HEADER_NAME_RPC_ROOT_ID , genlib:to_binary(TraceId)},
{?HEADER_NAME_RPC_ID , genlib:to_binary(ReqId)}, {?HEADER_NAME_RPC_ID , genlib:to_binary(SpanId)},
{?HEADER_NAME_RPC_PARENT_ID , genlib:to_binary(PaReqId)} {?HEADER_NAME_RPC_PARENT_ID , genlib:to_binary(ParentId)}
], ],
case send(Url, Headers, WBuffer, Options) of case send(Url, Headers, WBuffer, Options) of
{ok, Response} -> {ok, Response} ->

View File

@ -105,7 +105,7 @@ all() ->
call_handle_error_fails_test, call_handle_error_fails_test,
call_oneway_void_test, call_oneway_void_test,
call_async_ok_test, call_async_ok_test,
check_req_ids_sequence_test, check_span_ids_sequence_test,
call_two_services_test, call_two_services_test,
call_with_client_pool_test, call_with_client_pool_test,
multiplexed_transport_test multiplexed_transport_test
@ -209,7 +209,7 @@ call_safe_handler_throw_unexpected_test(_) ->
Id = <<"call_safe_handler_throw_unexpected">>, Id = <<"call_safe_handler_throw_unexpected">>,
Current = genlib_map:get(<<"Rocket Launcher">>, ?WEAPONS), Current = genlib_map:get(<<"Rocket Launcher">>, ?WEAPONS),
Client = get_client(Id), Client = get_client(Id),
Expect = {error, rpc_failed, Client#{req_id => Id}}, Expect = {error, rpc_failed, Client#{span_id => Id}},
Expect = call_safe(Client, weapons, switch_weapon, Expect = call_safe(Client, weapons, switch_weapon,
[Current, next, 1, self_to_bin()]), [Current, next, 1, self_to_bin()]),
{ok, _} = receive_msg({Id, Current}). {ok, _} = receive_msg({Id, Current}).
@ -218,7 +218,7 @@ call_handler_throw_unexpected_test(_) ->
Id = <<"call_handler_throw_unexpected">>, Id = <<"call_handler_throw_unexpected">>,
Current = genlib_map:get(<<"Rocket Launcher">>, ?WEAPONS), Current = genlib_map:get(<<"Rocket Launcher">>, ?WEAPONS),
Client = get_client(Id), Client = get_client(Id),
Expect = {rpc_failed, Client#{req_id => Id}}, Expect = {rpc_failed, Client#{span_id => Id}},
try call(Client, weapons, switch_weapon, [Current, next, 1, self_to_bin()]) try call(Client, weapons, switch_weapon, [Current, next, 1, self_to_bin()])
catch catch
error:Expect -> ok error:Expect -> ok
@ -245,7 +245,7 @@ call_safe_server_transport_error_test(_) ->
Id = <<"call_safe_server_transport_error">>, Id = <<"call_safe_server_transport_error">>,
Armor = <<"Helmet">>, Armor = <<"Helmet">>,
Client = get_client(Id), Client = get_client(Id),
Expect = {error, rpc_failed, Client#{req_id => Id}}, Expect = {error, rpc_failed, Client#{span_id => Id}},
Expect = call_safe(Client, powerups, get_powerup, Expect = call_safe(Client, powerups, get_powerup,
[Armor, self_to_bin()]), [Armor, self_to_bin()]),
{ok, _} = receive_msg({Id, Armor}). {ok, _} = receive_msg({Id, Armor}).
@ -259,7 +259,7 @@ call_handle_error_fails_test(_) ->
do_call_server_transport_error(Id) -> do_call_server_transport_error(Id) ->
Armor = <<"Helmet">>, Armor = <<"Helmet">>,
Client = get_client(Id), Client = get_client(Id),
Expect = {rpc_failed, Client#{req_id => Id}}, Expect = {rpc_failed, Client#{span_id => Id}},
try call(Client, powerups, get_powerup, [Armor, self_to_bin()]) try call(Client, powerups, get_powerup, [Armor, self_to_bin()])
catch catch
error:Expect -> ok error:Expect -> ok
@ -270,7 +270,7 @@ call_oneway_void_test(_) ->
Id = <<"call_oneway_void_test">>, Id = <<"call_oneway_void_test">>,
Armor = <<"Helmet">>, Armor = <<"Helmet">>,
Client = get_client(Id), Client = get_client(Id),
Expect = {ok, ok, Client#{req_id => Id}}, Expect = {ok, ok, Client#{span_id => Id}},
Expect = call(Client, powerups, like_powerup, [Armor, self_to_bin()]), Expect = call(Client, powerups, like_powerup, [Armor, self_to_bin()]),
{ok, _} = receive_msg({Id, Armor}). {ok, _} = receive_msg({Id, Armor}).
@ -280,11 +280,11 @@ call_async_ok_test(C) ->
Callback = fun(Res) -> collect(Res, Pid) end, Callback = fun(Res) -> collect(Res, Pid) end,
Id1 = <<"call_async_ok1">>, Id1 = <<"call_async_ok1">>,
Client1 = get_client(Id1), Client1 = get_client(Id1),
Client11 = Client1#{req_id => Id1}, Client11 = Client1#{span_id => Id1},
{ok, Pid1, Client11} = get_weapon(Client1, Sup, Callback, <<"Impact Hammer">>), {ok, Pid1, Client11} = get_weapon(Client1, Sup, Callback, <<"Impact Hammer">>),
Id2 = <<"call_async_ok2">>, Id2 = <<"call_async_ok2">>,
Client2 = get_client(Id2), Client2 = get_client(Id2),
Client22 = Client2#{req_id => Id2}, Client22 = Client2#{span_id => Id2},
{ok, Pid2, Client22} = get_weapon(Client2, Sup, Callback, <<"Flak Cannon">>), {ok, Pid2, Client22} = get_weapon(Client2, Sup, Callback, <<"Flak Cannon">>),
{ok, Pid1} = receive_msg({Client11, genlib_map:get(<<"Impact Hammer">>, ?WEAPONS)}), {ok, Pid1} = receive_msg({Client11, genlib_map:get(<<"Impact Hammer">>, ?WEAPONS)}),
{ok, Pid2} = receive_msg({Client22, genlib_map:get(<<"Flak Cannon">>, ?WEAPONS)}). {ok, Pid2} = receive_msg({Client22, genlib_map:get(<<"Flak Cannon">>, ?WEAPONS)}).
@ -295,11 +295,11 @@ get_weapon(Client, Sup, Cb, Gun) ->
collect({ok, Result, Tag}, Pid) -> collect({ok, Result, Tag}, Pid) ->
send_msg(Pid, {Tag, Result}). send_msg(Pid, {Tag, Result}).
check_req_ids_sequence_test(_) -> check_span_ids_sequence_test(_) ->
Id = <<"check_req_ids_sequence">>, Id = <<"check_span_ids_sequence">>,
Current = genlib_map:get(<<"Enforcer">>, ?WEAPONS), Current = genlib_map:get(<<"Enforcer">>, ?WEAPONS),
Client = get_client(Id), Client = get_client(Id),
Expect = {ok, genlib_map:get(<<"Ripper">>, ?WEAPONS), Client#{req_id => Id}}, Expect = {ok, genlib_map:get(<<"Ripper">>, ?WEAPONS), Client#{span_id => Id}},
Expect = call(Client, weapons, switch_weapon, Expect = call(Client, weapons, switch_weapon,
[Current, next, 1, self_to_bin()]). [Current, next, 1, self_to_bin()]).
@ -309,7 +309,7 @@ call_two_services_test(_) ->
Id = <<"two_services2">>, Id = <<"two_services2">>,
Armor = <<"Body Armor">>, Armor = <<"Body Armor">>,
Client = get_client(Id), Client = get_client(Id),
Expect = {ok, genlib_map:get(<<"Body Armor">>, ?POWERUPS), Client#{req_id => Id}}, Expect = {ok, genlib_map:get(<<"Body Armor">>, ?POWERUPS), Client#{span_id => Id}},
Expect = call_safe(Client, powerups, get_powerup, [Armor, self_to_bin()]), Expect = call_safe(Client, powerups, get_powerup, [Armor, self_to_bin()]),
{ok, _} = receive_msg({Id, Armor}). {ok, _} = receive_msg({Id, Armor}).
@ -320,7 +320,7 @@ call_with_client_pool_test(_) ->
Gun = <<"Enforcer">>, Gun = <<"Enforcer">>,
Client = get_client(Id), Client = get_client(Id),
{Url, Service} = get_service_endpoint(weapons), {Url, Service} = get_service_endpoint(weapons),
Expect = {ok, genlib_map:get(Gun, ?WEAPONS), Client#{req_id => Id}}, Expect = {ok, genlib_map:get(Gun, ?WEAPONS), Client#{span_id => Id}},
Expect = rpc_client:call( Expect = rpc_client:call(
Client, Client,
{Service, get_weapon, [Gun, self_to_bin()]}, {Service, get_weapon, [Gun, self_to_bin()]},
@ -342,7 +342,7 @@ make_thrift_multiplexed_client(Id, ServiceName, {Url, Service}) ->
{ok, Protocol} = thrift_binary_protocol:new( {ok, Protocol} = thrift_binary_protocol:new(
rpc_thrift_http_transport:new( rpc_thrift_http_transport:new(
#{ #{
req_id => Id, root_req_id => Id, parent_req_id => Id span_id => Id, trace_id => Id, parent_id => Id
}, },
#{url => Url} #{url => Url}
), ),
@ -366,16 +366,16 @@ init(_) ->
%% %%
%% Weapons %% Weapons
handle_function(switch_weapon, RpcClient = #{parent_req_id := PaReqId}, handle_function(switch_weapon, RpcClient = #{parent_id := ParentId},
{CurrentWeapon, Direction, Shift, To}, _Opts {CurrentWeapon, Direction, Shift, To}, _Opts
) -> ) ->
send_msg(To, {PaReqId, CurrentWeapon}), send_msg(To, {ParentId, CurrentWeapon}),
switch_weapon(CurrentWeapon, Direction, Shift, RpcClient); switch_weapon(CurrentWeapon, Direction, Shift, RpcClient);
handle_function(get_weapon, #{parent_req_id := PaReqId}, handle_function(get_weapon, #{parent_id := ParentId},
{Name, To}, _Opts) {Name, To}, _Opts)
-> ->
send_msg(To,{PaReqId,Name}), send_msg(To,{ParentId,Name}),
Res = case genlib_map:get(Name, ?WEAPONS) of Res = case genlib_map:get(Name, ?WEAPONS) of
#weapon{ammo = 0} -> #weapon{ammo = 0} ->
throw(?weapon_failure("out of ammo")); throw(?weapon_failure("out of ammo"));
@ -385,14 +385,14 @@ handle_function(get_weapon, #{parent_req_id := PaReqId},
{ok, Res}; {ok, Res};
%% Powerups %% Powerups
handle_function(get_powerup, #{parent_req_id := PaReqId}, {Name, To}, _Opts) -> handle_function(get_powerup, #{parent_id := ParentId}, {Name, To}, _Opts) ->
send_msg(To, {PaReqId, Name}), send_msg(To, {ParentId, Name}),
{ok, genlib_map:get(Name, ?POWERUPS, powerup_unknown)}; {ok, genlib_map:get(Name, ?POWERUPS, powerup_unknown)};
handle_function(like_powerup, #{parent_req_id := PaReqId}, {Name, To}, _Opts) -> handle_function(like_powerup, #{parent_id := ParentId}, {Name, To}, _Opts) ->
send_msg(To, {PaReqId, Name}), send_msg(To, {ParentId, Name}),
ok. ok.
handle_error(get_powerup, #{parent_req_id := <<"call_handle_error_fails">>}, _, _) -> handle_error(get_powerup, #{parent_id := <<"call_handle_error_fails">>}, _, _) ->
error(no_more_powerups); error(no_more_powerups);
handle_error(_Function, _RpcClient, _Reason, _Opts) -> handle_error(_Function, _RpcClient, _Reason, _Opts) ->
ok. ok.
@ -445,7 +445,7 @@ get_service_endpoint(powerups) ->
gun_test_bacic(CallFun, Id, Gun, {ExpectStatus, ExpectRes}, WithMsg) -> gun_test_bacic(CallFun, Id, Gun, {ExpectStatus, ExpectRes}, WithMsg) ->
Client = get_client(Id), Client = get_client(Id),
Expect = {ExpectStatus, ExpectRes, Client#{req_id => Id}}, Expect = {ExpectStatus, ExpectRes, Client#{span_id => Id}},
Expect = ?MODULE:CallFun(Client, weapons, get_weapon, [Gun, self_to_bin()]), Expect = ?MODULE:CallFun(Client, weapons, get_weapon, [Gun, self_to_bin()]),
case WithMsg of case WithMsg of
true -> {ok, _} = receive_msg({Id, Gun}); true -> {ok, _} = receive_msg({Id, Gun});
@ -454,7 +454,7 @@ gun_test_bacic(CallFun, Id, Gun, {ExpectStatus, ExpectRes}, WithMsg) ->
gun_catch_test_basic(Id, Gun, {Class, Exception}, WithMsg) -> gun_catch_test_basic(Id, Gun, {Class, Exception}, WithMsg) ->
Client = get_client(Id), Client = get_client(Id),
Expect = {Exception, Client#{req_id => Id}}, Expect = {Exception, Client#{span_id => Id}},
try call(Client, weapons, get_weapon, [Gun, self_to_bin()]) try call(Client, weapons, get_weapon, [Gun, self_to_bin()])
catch catch
Class:Expect -> ok Class:Expect -> ok