TD-686: Adopts opentelemetry API (#27)

* Adds otel propagation to legacy thrift transport implementations
* Complete otel propagation in pass-through testcase
* Implements woody event handler for otel spans
This commit is contained in:
Aleksey Kashapov 2023-10-04 14:44:43 +03:00 committed by GitHub
parent 511e422a59
commit 5d46291a6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 330 additions and 10 deletions

View File

@ -32,7 +32,8 @@
{cache, "2.3.3"},
{thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {branch, "master"}}},
{snowflake, {git, "https://github.com/valitydev/snowflake.git", {branch, "master"}}},
{genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}}
{genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}},
{opentelemetry_api, "1.2.1"}
]}.
%% XRef checks
@ -87,10 +88,11 @@
{git, "https://github.com/valitydev/damsel.git", {ref, "3fa6f31db54b2ae781b27898ab4daf56bb36eb36"}}},
{mg_proto,
{git, "https://github.com/valitydev/machinegun-proto.git",
{ref, "ebae56fe2b3e79e4eb34afc8cb55c9012ae989f8"}}}
{ref, "ebae56fe2b3e79e4eb34afc8cb55c9012ae989f8"}}},
{opentelemetry, "1.3.0"}
]},
{dialyzer, [
{plt_extra_apps, [how_are_you, eunit, proper, common_test, cth_readable]}
{plt_extra_apps, [how_are_you, eunit, proper, common_test, cth_readable, opentelemetry]}
]}
]}
]}.

View File

@ -12,6 +12,10 @@
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},1},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1},
{<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.2.1">>},0},
{<<"opentelemetry_semantic_conventions">>,
{pkg,<<"opentelemetry_semantic_conventions">>,<<"0.2.0">>},
1},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},1},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},1},
{<<"snowflake">>,
@ -35,6 +39,8 @@
{<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>},
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"opentelemetry_api">>, <<"7B69ED4F40025C005DE0B74FCE8C0549625D59CB4DF12D15C32FE6DC5076FF42">>},
{<<"opentelemetry_semantic_conventions">>, <<"B67FE459C2938FCAB341CB0951C44860C62347C005ACE1B50F8402576F241435">>},
{<<"parse_trans">>, <<"16328AB840CC09919BD10DAB29E431DA3AF9E9E7E7E6F0089DD5A2D2820011D8">>},
{<<"ranch">>, <<"8C7A100A139FD57F17327B6413E4167AC559FBC04CA7448E9BE9057311597A1D">>},
{<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>},
@ -49,6 +55,8 @@
{<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>},
{<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>},
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>},
{<<"opentelemetry_api">>, <<"6D7A27B7CAD2AD69A09CABF6670514CAFCEC717C8441BEB5C96322BAC3D05350">>},
{<<"opentelemetry_semantic_conventions">>, <<"D61FA1F5639EE8668D74B527E6806E0503EFC55A42DB7B5F39939D84C07D6895">>},
{<<"parse_trans">>, <<"07CD9577885F56362D414E8C4C4E6BDF10D43A8767ABB92D24CBE8B24C54888B">>},
{<<"ranch">>, <<"49FBCFD3682FAB1F5D109351B61257676DA1A2FDBE295904176D5E521A2DDFE5">>},
{<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>},

View File

@ -11,7 +11,8 @@
hackney,
thrift,
gproc,
cache
cache,
opentelemetry_api
]},
{env, [
{enable_debug, false}

View File

@ -128,7 +128,7 @@ send(Url, Body, Options, ResOpts, WoodyState) ->
Headers = add_host_header(OldUrl, make_woody_headers(Context)),
Options1 = set_defaults(Options),
Options2 = set_timeouts(Options1, Context),
HeaderList = maps:to_list(Headers),
HeaderList = otel_propagator_text_map:inject(maps:to_list(Headers)),
Result = hackney:request(post, NewUrl, HeaderList, Body, maps:to_list(Options2)),
transform_request_results(Result);
{error, Reason} ->

View File

@ -155,7 +155,8 @@ send_call(Buffer, #{url := Url} = Opts, WoodyState) ->
% reusing keep-alive connections to dead hosts
case woody_resolver:resolve_url(Url, WoodyState, ResolverOpts) of
{ok, {OldUrl, NewUrl}, ConnectOpts} ->
Headers = add_host_header(OldUrl, make_woody_headers(Context)),
Headers0 = add_host_header(OldUrl, make_woody_headers(Context)),
Headers1 = otel_propagator_text_map:inject(Headers0),
TransportOpts1 = set_defaults(TransportOpts),
TransportOpts2 = set_timeouts(TransportOpts1, Context),
% NOTE
@ -163,7 +164,7 @@ send_call(Buffer, #{url := Url} = Opts, WoodyState) ->
% `set_tls_overrides/2`.
TransportOpts3 = append_connect_opts(TransportOpts2, ConnectOpts),
TransportOpts4 = set_tls_overrides(TransportOpts3, OldUrl),
Result = hackney:request(post, NewUrl, Headers, Buffer, maps:to_list(TransportOpts4)),
Result = hackney:request(post, NewUrl, Headers1, Buffer, maps:to_list(TransportOpts4)),
handle_response(Result, WoodyState);
{error, Reason} ->
Error = {error, {resolve_failed, Reason}},

View File

@ -0,0 +1,100 @@
-module(woody_event_handler_otel).
-include_lib("opentelemetry_api/include/opentelemetry.hrl").
-include("woody_defs.hrl").
-behaviour(woody_event_handler).
-export([handle_event/4]).
-spec handle_event(Event, RpcId, Meta, Opts) -> ok when
Event :: woody_event_handler:event(),
RpcId :: woody:rpc_id() | undefined,
Meta :: woody_event_handler:event_meta(),
Opts :: woody:options().
-define(IS_SPAN_START(Event), Event =:= ?EV_CALL_SERVICE orelse Event =:= ?EV_INVOKE_SERVICE_HANDLER).
-define(IS_SPAN_END(Event), Event =:= ?EV_SERVICE_RESULT orelse Event =:= ?EV_SERVICE_HANDLER_RESULT).
-define(IS_CLIENT_INTERNAL(Event),
Event =:= ?EV_CLIENT_CACHE_HIT orelse
Event =:= ?EV_CLIENT_CACHE_MISS orelse
Event =:= ?EV_CLIENT_CACHE_UPDATE orelse
Event =:= ?EV_CLIENT_SEND orelse
Event =:= ?EV_CLIENT_RECEIVE
).
%% Client events
handle_event(Event, RpcID, _Meta = #{url := Url}, _Opts) when ?IS_CLIENT_INTERNAL(Event) ->
with_span(otel_ctx:get_current(), mk_ref(RpcID), fun(SpanCtx) ->
_ = otel_span:add_event(SpanCtx, atom_to_binary(Event), #{url => Url})
end);
%% Internal error handling
handle_event(?EV_INTERNAL_ERROR, RpcID, Meta = #{error := Error, class := Class, reason := Reason}, _Opts) ->
Stacktrace = maps:get(stack, Meta, []),
Details = io_lib:format("~ts: ~ts", [Error, Reason]),
with_span(otel_ctx:get_current(), mk_ref(RpcID), fun(SpanCtx) ->
_ = otel_span:record_exception(SpanCtx, genlib:define(Class, error), Details, Stacktrace, #{}),
otel_maybe_cleanup(Meta, SpanCtx)
end);
%% Registers span starts/ends for woody client calls and woody server function invocations.
handle_event(Event, RpcID, Meta, _Opts) when ?IS_SPAN_START(Event) ->
Tracer = opentelemetry:get_application_tracer(?MODULE),
span_start(Tracer, otel_ctx:get_current(), mk_ref(RpcID), mk_name(Meta), mk_opts(Event));
handle_event(Event, RpcID, Meta, _Opts) when ?IS_SPAN_END(Event) ->
span_end(otel_ctx:get_current(), mk_ref(RpcID), fun(SpanCtx) ->
otel_maybe_erroneous_result(SpanCtx, Meta)
end);
handle_event(_Event, _RpcID, _Meta, _Opts) ->
ok.
%%
span_start(Tracer, Ctx, Key, SpanName, Opts) ->
SpanCtx = otel_tracer:start_span(Ctx, Tracer, SpanName, Opts),
Ctx1 = woody_util:span_stack_put(Key, SpanCtx, Ctx),
Ctx2 = otel_tracer:set_current_span(Ctx1, SpanCtx),
_ = otel_ctx:attach(Ctx2),
ok.
span_end(Ctx, Key, OnBeforeEnd) ->
case woody_util:span_stack_pop(Key, Ctx) of
{error, notfound} ->
ok;
{ok, SpanCtx, ParentSpanCtx, Ctx1} ->
SpanCtx1 = OnBeforeEnd(SpanCtx),
_ = otel_span:end_span(SpanCtx1, undefined),
Ctx2 = otel_tracer:set_current_span(Ctx1, ParentSpanCtx),
_ = otel_ctx:attach(Ctx2),
ok
end.
with_span(Ctx, Key, F) ->
SpanCtx = woody_util:span_stack_get(Key, Ctx, otel_tracer:current_span_ctx(Ctx)),
_ = F(SpanCtx),
ok.
otel_maybe_cleanup(#{final := true}, SpanCtx) ->
_ = otel_span:end_span(SpanCtx, undefined),
otel_ctx:clear(),
ok;
otel_maybe_cleanup(_Meta, _SpanCtx) ->
ok.
otel_maybe_erroneous_result(SpanCtx, Meta = #{status := error, result := Reason}) ->
Class = maps:get(except_class, Meta, error),
Stacktrace = maps:get(stack, Meta, []),
_ = otel_span:record_exception(SpanCtx, Class, Reason, Stacktrace, #{}),
SpanCtx;
otel_maybe_erroneous_result(SpanCtx, _Meta) ->
SpanCtx.
mk_opts(?EV_CALL_SERVICE) ->
#{kind => ?SPAN_KIND_CLIENT};
mk_opts(?EV_INVOKE_SERVICE_HANDLER) ->
#{kind => ?SPAN_KIND_SERVER}.
mk_ref(#{span_id := WoodySpanId}) ->
WoodySpanId.
mk_name(#{role := Role, service := Service, function := Function}) ->
woody_util:to_binary([Role, " ", Service, ":", Function]).

View File

@ -393,6 +393,7 @@ check_deadline(Deadline, Req, State = #{url := Url, woody_state := WoodyState})
-spec check_metadata_headers(woody:http_headers(), cowboy_req:req(), state()) -> check_result().
check_metadata_headers(Headers, Req, State = #{woody_state := WoodyState, regexp_meta := ReMeta}) ->
_OtelCtx = otel_propagator_text_map:extract(maps:to_list(Headers)),
WoodyState1 = set_metadata(find_metadata(Headers, ReMeta), WoodyState),
{ok, Req, update_woody_state(State, WoodyState1, Req)}.

View File

@ -407,6 +407,7 @@ check_deadline(Deadline, Req, State = #{url := Url, woody_state := WoodyState})
-spec check_metadata_headers(woody:http_headers(), cowboy_req:req(), state()) -> check_result().
check_metadata_headers(Headers, Req, State = #{woody_state := WoodyState, regexp_meta := ReMeta}) ->
_OtelCtx = otel_propagator_text_map:extract(maps:to_list(Headers)),
WoodyState1 = set_metadata(find_metadata(Headers, ReMeta), WoodyState),
{ok, Req, update_woody_state(State, WoodyState1, Req)}.

View File

@ -11,6 +11,12 @@
-define(DEFAULT_HANDLER_OPTS, undefined).
%%
-export([span_stack_put/3]).
-export([span_stack_get/3]).
-export([span_stack_pop/2]).
%%
%% Internal API
%%
@ -51,3 +57,41 @@ get_rpc_type({Module, Service}, Function) ->
-spec get_rpc_reply_type(_ThriftReplyType) -> woody:rpc_type().
get_rpc_reply_type(oneway_void) -> cast;
get_rpc_reply_type(_) -> call.
%% OTEL context span helpers
%% NOTE Those helpers are designed specifically to manage stacking spans during
%% woody client (or server) calls _inside_ one single process context.
%% Thus, use of process dictionary via `otel_ctx'.
-define(OTEL_SPANS_STACK, 'spans_ctx_stack').
-type span_key() :: atom() | binary() | string().
-type maybe_span_ctx() :: opentelemetry:span_ctx() | undefined.
-spec span_stack_put(span_key(), opentelemetry:span_ctx(), otel_ctx:t()) -> otel_ctx:t().
span_stack_put(Key, SpanCtx, Context) ->
Stack = otel_ctx:get_value(Context, ?OTEL_SPANS_STACK, []),
Entry = {Key, SpanCtx, otel_tracer:current_span_ctx(Context)},
otel_ctx:set_value(Context, ?OTEL_SPANS_STACK, [Entry | Stack]).
-spec span_stack_get(span_key(), otel_ctx:t(), maybe_span_ctx()) -> maybe_span_ctx().
span_stack_get(Key, Context, Default) ->
Stack = otel_ctx:get_value(Context, ?OTEL_SPANS_STACK, []),
case lists:keyfind(Key, 1, Stack) of
false ->
Default;
{_Key, SpanCtx, _ParentSpanCtx} ->
SpanCtx
end.
-spec span_stack_pop(span_key(), otel_ctx:t()) ->
{ok, opentelemetry:span_ctx(), maybe_span_ctx(), otel_ctx:t()} | {error, notfound}.
span_stack_pop(Key, Context) ->
Stack = otel_ctx:get_value(Context, ?OTEL_SPANS_STACK, []),
case lists:keytake(Key, 1, Stack) of
false ->
{error, notfound};
{value, {_Key, SpanCtx, ParentSpanCtx}, Stack1} ->
Context1 = otel_ctx:set_value(Context, ?OTEL_SPANS_STACK, Stack1),
{ok, SpanCtx, ParentSpanCtx, Context1}
end.

View File

@ -0,0 +1,97 @@
-module(woody_ct_otel_collector).
-behaviour(gen_server).
-export([
start_link/0,
get_trace/1,
get_traces/0
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).
-include_lib("opentelemetry/include/otel_span.hrl").
-type span() :: #span{}.
-type span_node() :: #{span := span(), children := [span_node()]}.
-type trace() :: #{
id := opentelemetry:trace_id(),
node := span_node()
}.
%
-spec start_link() -> {ok, pid()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_trace(opentelemetry:trace_id()) -> {ok, trace()} | {error, notfound}.
get_trace(TraceId) ->
gen_server:call(?MODULE, {trace, TraceId}).
-spec get_traces() -> {ok, [trace()]}.
get_traces() ->
gen_server:call(?MODULE, traces).
-spec init(_) -> {ok, _}.
init(_Opts) ->
{ok, #{}}.
-spec handle_info(_, T) -> {noreply, T}.
handle_info({span, Span}, State0) ->
State1 = maps:update_with(Span#span.trace_id, fun(V) -> [Span | V] end, [Span], State0),
{noreply, State1};
handle_info(_Msg, State) ->
{noreply, State}.
-spec handle_call(_, _, T) -> {noreply, T}.
handle_call(traces, _From, State) ->
Result = maps:map(fun(TraceId, Spans) -> build_trace(TraceId, Spans) end, State),
{reply, maps:values(Result), State};
handle_call({trace, TraceId}, _From, State) ->
Result =
case maps:get(TraceId, State, undefined) of
undefined -> {error, notfound};
Spans -> {ok, build_trace(TraceId, Spans)}
end,
{reply, Result, State};
handle_call(_Msg, _From, State) ->
{noreply, State}.
-spec handle_cast(_, T) -> {noreply, T}.
handle_cast(_Msg, State) ->
{noreply, State}.
%
build_trace(TraceId, Spans0) ->
Spans1 = lists:sort(fun(#span{start_time = A}, #span{start_time = B}) -> A < B end, Spans0),
[RootSpan | _] = lists:filter(
fun
(#span{parent_span_id = undefined}) -> true;
(_) -> false
end,
Spans1
),
#{
id => TraceId,
node => lists:foldl(fun(Span, RootNode) -> update_node(Span, RootNode) end, new_span_node(RootSpan), Spans1)
}.
update_node(
Span = #span{parent_span_id = ParentId},
SpanNode = #{span := #span{span_id = ParentId}, children := Children}
) ->
SpanNode#{children => [new_span_node(Span) | Children]};
update_node(Span, SpanNode = #{children := Children}) ->
SpanNode#{children => lists:map(fun(Child) -> update_node(Span, Child) end, Children)}.
new_span_node(Span) ->
#{span => Span, children => []}.

View File

@ -3,6 +3,8 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("hackney/include/hackney_lib.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").
-include_lib("opentelemetry/include/otel_span.hrl").
-include("woody_test_thrift.hrl").
-include("woody_defs.hrl").
@ -111,6 +113,11 @@
-define(WEAPON_STACK_OVERFLOW, pos_out_of_boundaries).
-define(BAD_POWERUP_REPLY, powerup_unknown).
-define(OTEL_SPAN(Name, Children), #{span := #span{name = Name}, children := Children}).
-define(OTEL_SPAN(Name, SpanAttributes, Children), #{
span := #span{name = Name, attributes = SpanAttributes}, children := Children
}).
-type config() :: [{atom(), any()}].
-type case_name() :: atom().
-type group_name() :: atom().
@ -293,7 +300,8 @@ init_per_suite(C) ->
]),
{ok, Apps} = application:ensure_all_started(woody),
{ok, HayApps} = application:ensure_all_started(how_are_you),
[{apps, HayApps ++ Apps} | C].
{ok, OtelApps} = setup_opentelemetry(),
[{apps, OtelApps ++ HayApps ++ Apps} | C].
end_per_suite(C) ->
% unset so it won't report metrics next suite
@ -352,6 +360,20 @@ init_per_group(Name, Config) ->
end_per_group(_Name, _Config) ->
ok.
setup_opentelemetry() ->
ok = application:set_env([
{opentelemetry, [
{text_map_propagators, [
baggage,
trace_context
]},
{span_processor, simple},
{traces_exporter, {otel_exporter_pid, woody_ct_otel_collector}}
]}
]),
ok = application:start(opentelemetry),
{ok, [opentelemetry]}.
start_tc_sup() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -673,8 +695,37 @@ call_pass_thru_ok_test(C) ->
Armor = <<"AntiGrav Boots">>,
Context = make_context(<<"call_pass_thru_ok">>),
Expect = {ok, genlib_map:get(Armor, powerups())},
Baggage = #{<<"service">> => <<"Powerups">>},
SpanName = <<"test Powerups:proxy_get_powerup">>,
%% NOTE: `otel_baggage` uses `otel_ctx` that relies on __process dictionary__
ok = otel_baggage:set(Baggage),
Tracer = opentelemetry:get_application_tracer(?MODULE),
%% NOTE: Starts span and sets its context as current span in __process dictionary__
SpanCtx = otel_tracer:set_current_span(otel_tracer:start_span(Tracer, SpanName, #{})),
Expect = call(Context, 'Powerups', proxy_get_powerup, {Armor, self_to_bin()}, C),
{ok, _} = receive_msg(Armor, Context).
%% NOTE: Ends span and puts context into __process dictionary__
_ = otel_tracer:set_current_span(otel_span:end_span(SpanCtx, undefined)),
{ok, _} = receive_msg(Armor, Context),
{ok, #{node := Root}} = woody_ct_otel_collector:get_trace(SpanCtx#span_ctx.trace_id),
%% Prepare otel attributes to match against
ProxyAttrs = mk_otel_attributes(Baggage),
Attrs = mk_otel_attributes(Baggage#{<<"proxied">> => <<"true">>}),
?assertMatch(
?OTEL_SPAN(SpanName, [
?OTEL_SPAN(<<"client Powerups:proxy_get_powerup">>, [
%% Upon invocation event baggage is expected to be put in span attributes
?OTEL_SPAN(<<"server Powerups:proxy_get_powerup">>, ProxyAttrs, [
%% New client starts call here
?OTEL_SPAN(<<"client Powerups:get_powerup">>, [
?OTEL_SPAN(<<"server Powerups:get_powerup">>, Attrs, [
%% Expect no child spans uphere
])
])
])
])
]),
Root
).
call_pass_thru_except_test(C) ->
Armor = <<"Shield Belt">>,
@ -985,7 +1036,7 @@ init(_) ->
{ok,
{
{one_for_one, 1, 1},
[]
[#{id => woody_ct_otel_collector, start => {woody_ct_otel_collector, start_link, []}}]
}}.
%%
@ -1015,6 +1066,8 @@ handle_function(ProxyGetPowerup, {Name, To}, Context, _Opts) when
ProxyGetPowerup =:= proxy_get_powerup orelse
ProxyGetPowerup =:= bad_proxy_get_powerup
->
%% NOTE: Merges baggage, requires #{binary() => binary()}
ok = otel_baggage:set(#{<<"proxied">> => <<"true">>}),
% NOTE
% Client may return `{exception, _}` tuple with some business level exception
% here, yet handler expects us to `throw/1` them. This is expected here it
@ -1054,6 +1107,12 @@ handle_event(
->
_ = handle_proxy_event(Event, Code, TraceId, ParentId),
log_event(Event, RpcId, Meta);
%% Handle invocation
handle_event(Event = ?EV_INVOKE_SERVICE_HANDLER, RpcId, Meta, _) ->
log_event(Event, RpcId, Meta),
SpanCtx = otel_tracer:current_span_ctx(),
_ = otel_span:set_attributes(SpanCtx, maps:map(fun(_Key, {Value, _Metadata}) -> Value end, otel_baggage:get_all())),
ok;
handle_event(Event, RpcId, Meta, _) ->
log_event(Event, RpcId, Meta).
@ -1084,6 +1143,7 @@ handle_proxy_event(Event, Code, Descr) ->
erlang:error(badarg, [Event, Code, Descr]).
log_event(Event, RpcId, Meta) ->
ok = woody_event_handler_otel:handle_event(Event, RpcId, Meta, []),
woody_ct_event_h:handle_event(Event, RpcId, Meta, []).
%%
@ -1285,3 +1345,8 @@ handle_sleep(Context) ->
BinTimer ->
timer:sleep(binary_to_integer(BinTimer))
end.
mk_otel_attributes(Attributes) ->
SpanAttributeCountLimit = otel_span_limits:attribute_count_limit(),
SpanAttributeValueLengthLimit = otel_span_limits:attribute_value_length_limit(),
otel_attributes:new(Attributes, SpanAttributeCountLimit, SpanAttributeValueLengthLimit).