NONE: Fix audit log crashing on filesync (#9)

This commit is contained in:
Alexey 2021-07-01 16:21:18 +03:00 committed by GitHub
parent a9e9da6674
commit aaf2428ab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 197 additions and 166 deletions

View File

@ -50,7 +50,7 @@
}
},
{token_keeper_proto,
{git, "git@github.com:rbkmoney/token-keeper-proto.git",
{git, "https://github.com/rbkmoney/token-keeper-proto.git",
{branch, "master"}
}
},
@ -100,22 +100,14 @@
ruleset => erl_files,
rules => [
{elvis_text_style, line_length, #{limit => 120, skip_comments => false}},
{elvis_text_style, no_tabs},
{elvis_text_style, no_trailing_whitespace},
{elvis_style, macro_module_names},
{elvis_style, operator_spaces, #{rules => [{right, ","}, {right, "++"}, {left, "++"}]}},
{elvis_style, nesting_level, #{level => 4}},
{elvis_style, god_modules, #{limit => 30, ignore => []}},
{elvis_style, no_if_expression},
{elvis_style, invalid_dynamic_call, #{ignore => []}},
{elvis_style, used_ignored_variable},
{elvis_style, no_behavior_info},
{elvis_style, module_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$"}},
{elvis_style, function_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*$"}},
{elvis_style, state_record_and_type, #{ignore => []}},
{elvis_style, no_spec_with_records},
{elvis_style, dont_repeat_yourself, #{min_complexity => 30}},
{elvis_style, no_debug_call, #{}}
% Too opinionated
{elvis_style, state_record_and_type, disable},
{elvis_style, invalid_dynamic_call, #{
ignore => [
% Implements parts of logger duties, including message formatting.
tk_audit_log
]
}}
]
},
#{
@ -132,8 +124,7 @@
% Tests are usually more comprehensible when a bit more verbose.
{elvis_style, dont_repeat_yourself, #{min_complexity => 20}},
% Too opionated
{elvis_style, state_record_and_type, disable},
{elvis_style, god_modules, #{ignore => []}}
{elvis_style, state_record_and_type, disable}
]
},
#{
@ -145,7 +136,7 @@
dirs => ["."],
filter => "rebar.config",
rules => [
{elvis_text_style, line_length, #{limit => 100, skip_comments => false}},
{elvis_text_style, line_length, #{limit => 120, skip_comments => false}},
{elvis_text_style, no_tabs},
{elvis_text_style, no_trailing_whitespace}
]
@ -154,7 +145,7 @@
dirs => ["src"],
filter => "*.app.src",
rules => [
{elvis_text_style, line_length, #{limit => 100, skip_comments => false}},
{elvis_text_style, line_length, #{limit => 120, skip_comments => false}},
{elvis_text_style, no_tabs},
{elvis_text_style, no_trailing_whitespace}
]

View File

@ -1,24 +1,25 @@
-module(tk_audit_log).
-export([init/1]).
-export([stop/1]).
-export([child_spec/1]).
-behaviour(tk_pulse).
-export([handle_beat/3]).
-behaviour(gen_server).
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-define(HANDLER_ID, ?MODULE).
-define(DEFAULT_LOG_LEVEL, notice).
-define(DEFAULT_FLUSH_QLEN, 10000).
-define(LOG_DOMAIN, [audit]).
-define(DEFAULT_FORMATTER, {logger_formatter, #{single_line => true}}).
-type opts() :: #{
log => log_opts() | disabled
}.
% NOTE
% Keep in sync with `opts()`.
-define(OPTS, [log]).
-type log_opts() :: #{
% Which log level to use for audit events? Defaults to `notice`.
level => logger:level(),
backend => logger_backend_opts(),
@ -27,125 +28,83 @@
}.
% NOTE
% Keep in sync with `log_opts()`.
-define(LOG_OPTS, [level, backend, formatter]).
% Keep in sync with `opts()`.
-define(OPTS, [level, backend, formatter]).
-type logger_backend_opts() :: #{
% Where to log? Defaults to `standard_io`.
type => standard_io | standard_error | file,
% Log file location. No default, MUST be set if `type` is `file`.
file => file:filename(),
% How often to force fsync on log file, in ms? Defaults to 1000.
% http://erlang.org/doc/man/logger_std_h.html
filesync_repeat_interval => pos_integer() | no_repeat,
max_no_bytes => pos_integer() | infinity,
max_no_files => non_neg_integer(),
% Maximum number of events to queue for writing. Defaults to 10000.
% http://erlang.org/doc/apps/kernel/logger_chapter.html#message-queue-length
flush_qlen => non_neg_integer()
max_no_files => non_neg_integer()
}.
% NOTE
% Keep in sync with `logger_backend_opts()`.
-define(LOGGER_BACKEND_OPTS, [type, file, max_no_bytes, max_no_files, flush_qlen]).
-opaque st() :: {logger:level(), _Formatter :: {module(), logger:formatter_config()}}.
-export_type([opts/0]).
-export_type([st/0]).
%%
-type st() ::
{log, logger:level()}.
-spec init(opts()) -> tk_pulse:handlers(st()).
init(Opts) ->
-spec child_spec(opts()) -> {ok, supervisor:child_spec(), tk_pulse:handler(st())}.
child_spec(Opts) ->
_ = assert_strict_opts(?OPTS, Opts),
init_log_handler(maps:get(log, Opts, #{})).
init_log_handler(LogOpts = #{}) ->
_ = assert_strict_opts(?LOG_OPTS, LogOpts),
Level = validate_log_level(maps:get(level, LogOpts, ?DEFAULT_LOG_LEVEL)),
BackendConfig = mk_logger_backend_config(maps:get(backend, LogOpts, #{})),
HandlerConfig0 = maps:with([formatter], LogOpts),
HandlerConfig1 = HandlerConfig0#{
Level = validate_log_level(maps:get(level, Opts, ?DEFAULT_LOG_LEVEL)),
Formatter = maps:get(formatter, Opts, ?DEFAULT_FORMATTER),
BackendConfig = mk_logger_backend_config(maps:get(backend, Opts, #{})),
Config = #{
id => ?HANDLER_ID,
module => logger_std_h,
config => BackendConfig,
% NOTE
% This two options together ensure that _only_ audit logs will flow through to the backend.
filters => [{domain, {fun logger_filters:domain/2, {log, sub, ?LOG_DOMAIN}}}],
filter_default => stop
formatter => Formatter
},
ok = logger:add_handler(
?MODULE,
logger_std_h,
HandlerConfig1
),
% TODO
% Validate that global logger level doesn't suppress ours?
ok = log(Level, "audit log started", #{}),
[{?MODULE, {log, Level}}];
init_log_handler(disabled) ->
[].
ChildSpec = #{
id => ?MODULE,
start => {gen_server, start_link, [{local, ?HANDLER_ID}, ?MODULE, {Level, Config}, []]},
restart => permanent,
shutdown => 5000
},
Pulse = {?MODULE, {Level, Formatter}},
{ok, ChildSpec, Pulse}.
validate_log_level(Level) ->
eq = logger:compare_levels(Level, Level),
Level.
mk_logger_backend_config(BackendOpts) ->
_ = assert_strict_opts(?LOGGER_BACKEND_OPTS, BackendOpts),
Type = validate_log_type(maps:get(type, BackendOpts, standard_io)),
mk_logger_backend_config(Type, BackendOpts).
% NOTE
% Those are options only relevant for `logger_h_common` module, see its implementation for
% details.
Common = [filesync_repeat_interval],
CommonOpts = maps:with(Common, BackendOpts),
{ok, BackendConfig} = logger_std_h:check_config(
?HANDLER_ID,
set,
undefined,
tune_backend_config(maps:without(Common, BackendOpts))
),
maps:merge(
CommonOpts,
BackendConfig
).
validate_log_type(Type) when
Type == standard_io;
Type == standard_error;
Type == file
->
Type;
validate_log_type(Type) ->
erlang:error(badarg, [Type]).
mk_logger_backend_config(file = Type, Opts) ->
Defaults = get_default_backend_config(Type, Opts),
Filename = maps:get(file, Opts),
Config0 = maps:with([max_no_bytes, max_no_files], Opts),
Config = maps:merge(Defaults, Config0),
Config#{
type => Type,
file => Filename
};
mk_logger_backend_config(Type, Opts) ->
Defaults = get_default_backend_config(Type, Opts),
Defaults#{
type => Type
}.
get_default_backend_config(file, Opts) ->
tune_backend_config(Opts = #{type := file}) ->
_ = maps:get(file, Opts),
% NOTE
% All those options chosen to push message loss probability as close to zero as possible.
% Zero doesn't seem reachable with standard logger infrastructure because of various safeguards
% around unexpected backend and formatter errors.
Config = get_default_backend_config(Opts),
Config#{
% Protects against accidental write loss upon file rotation.
file_check => 0
};
get_default_backend_config(_Type, Opts) ->
get_default_backend_config(Opts).
get_default_backend_config(Opts) ->
FlushQLen = maps:get(flush_qlen, Opts, ?DEFAULT_FLUSH_QLEN),
#{
% No need to set it up here since we'll sync on EVERY write by ourself.
filesync_repeat_interval => no_repeat,
% http://erlang.org/doc/apps/kernel/logger_chapter.html#message-queue-length
sync_mode_qlen => 0,
drop_mode_qlen => FlushQLen,
flush_qlen => FlushQLen,
% http://erlang.org/doc/apps/kernel/logger_chapter.html#controlling-bursts-of-log-requests
burst_limit_enable => false,
% http://erlang.org/doc/apps/kernel/logger_chapter.html#terminating-an-overloaded-handler
overload_kill_enable => false
}.
maps:merge(
#{
% Protects against accidental write loss upon file rotation.
file_check => 0
},
Opts
);
tune_backend_config(Opts) ->
Opts.
assert_strict_opts(Ks, Opts) ->
case maps:without(Ks, Opts) of
@ -157,18 +116,65 @@ assert_strict_opts(Ks, Opts) ->
%%
-spec stop(opts()) -> ok.
stop(Opts = #{}) ->
stop_log_handler(maps:get(log, Opts, #{})).
-spec init({logger:level(), logger:handler_config()}) -> {ok, _State}.
init({Level, Config}) ->
% NOTE
% Here we're hijacking `logger_h_common` facility which drives log handler activities and
% periodic filesyncing. This gives us a thin wrapper supporting both logger handler API and
% an ability to make strictly synchronous writes which aren't really available through regular
% logger API. Note that we deliberately gave up on overload protection facilities (provided
% through `logger_olp`).
case logger_h_common:init(Config) of
{ok, State1} ->
Formatter = maps:get(formatter, Config),
StartEvent = mk_log_event(Level, "audit log started", #{}),
{ok, State2} = emit_log_sync(log_to_binary(StartEvent, Formatter), State1),
undefined = erlang:put(?MODULE, {Level, Formatter}),
{ok, State2};
Error ->
erlang:exit(Error)
end.
-spec stop_log_handler(log_opts()) -> ok.
stop_log_handler(LogOpts = #{}) ->
Level = maps:get(level, LogOpts, ?DEFAULT_LOG_LEVEL),
ok = log(Level, "audit log stopped", #{}),
_ = logger:remove_handler(?MODULE),
ok;
stop_log_handler(disabled) ->
ok.
-spec handle_call({?MODULE, binary()} | _Call, _From, State) -> {reply, ok | _Result, State}.
handle_call({?MODULE, Bin}, _From, State1) ->
{Result, State2} = emit_log_sync(Bin, State1),
{reply, Result, State2};
handle_call(Call, From, State) ->
logger_h_common:handle_call(Call, From, State).
-spec handle_cast(_Cast, State) -> {noreply, State}.
handle_cast(Cast, State) ->
logger_h_common:handle_cast(Cast, State).
-spec handle_info(_Info, State) -> {noreply, State}.
handle_info(Info, State) ->
logger_h_common:handle_info(Info, State).
-spec terminate(_Reason, _State) -> _.
terminate(Reason, State1) ->
{Level, Formatter} = erlang:get(?MODULE),
StopEvent = mk_log_event(Level, "audit log stopped", #{}),
{_, State2} = emit_log_sync(log_to_binary(StopEvent, Formatter), State1),
logger_h_common:terminate(Reason, State2).
% NOTE
% Be warned that this is IMPLEMENTATION DETAILS and is SUBJECT TO CHANGE!
% This code was adapted from `logger_h_common:handle_load/2` function found in kernel-7.2 app
% which is part of Erlang OTP 23.2.3 release.
% Please take care to update it when upgrading to newer Erlang OTP releases.
emit_log_sync(
Bin,
State = #{
id := Name,
module := Module,
handler_state := HandlerState
}
) ->
{Result, HS1} = Module:write(Name, sync, Bin, HandlerState),
{Result, State#{
handler_state => HS1,
last_op => write
}}.
%%
@ -176,27 +182,59 @@ stop_log_handler(disabled) ->
-type metadata() :: tk_pulse:metadata().
-spec handle_beat(beat(), metadata(), st()) -> ok.
handle_beat(Beat, Metadata, {log, Level}) ->
log(
get_severity(Beat, Level),
get_message(Beat),
extract_metadata(Metadata, get_beat_metadata(Beat))
).
handle_beat(Beat, Metadata, {DefaultLevel, Formatter}) ->
case get_level(Beat, DefaultLevel) of
undefined ->
ok;
Level ->
log(
Level,
get_message(Beat),
extract_metadata(Metadata, get_beat_metadata(Beat)),
Formatter
)
end.
log(Severity, Message, Metadata) ->
DefaultMetadata = #{
type => audit,
domain => ?LOG_DOMAIN
},
% NOTE
% Matching on `ok` here is crucial. Logger may decide to flush the queue behind the scenes so
% we need to ensure it's not happening.
ok = logger:log(Severity, Message, maps:merge(Metadata, DefaultMetadata)),
ok = logger_std_h:filesync(?MODULE),
ok.
log(Level, Message, Metadata, Formatter) ->
Event = mk_log_event(Level, Message, Metadata),
ok = gen_server:call(?HANDLER_ID, {?MODULE, log_to_binary(Event, Formatter)}).
get_severity({get_by_token, started}, _Level) -> debug;
get_severity(_, Level) -> Level.
log_to_binary(Log, {Formatter, FormatterConfig}) ->
string_to_binary(Formatter:format(Log, FormatterConfig)).
string_to_binary(String) ->
case unicode:characters_to_binary(String) of
Binary when is_binary(Binary) ->
Binary;
Error ->
erlang:error(Error)
end.
mk_log_event(Level, Message, Metadata) ->
#{
level => Level,
msg => {Message, []},
meta => add_default_metadata(Metadata)
}.
add_default_metadata(Meta1) ->
Meta2 = Meta1#{type => audit},
Meta3 = maps:merge(get_process_metadata(), Meta2),
logger:add_default_metadata(Meta3).
get_process_metadata() ->
genlib:define(logger:get_process_metadata(), #{}).
log_allowed(Level) ->
case logger:allow(Level, ?MODULE) of
true -> Level;
false -> undefined
end.
%%
get_level({get_by_token, started}, _Level) -> log_allowed(debug);
get_level(_, Level) -> Level.
get_message({get_by_token, started}) -> <<"get_by_token started">>;
get_message({get_by_token, succeeded}) -> <<"get_by_token succeeded">>;

View File

@ -4,7 +4,6 @@
-behaviour(application).
-export([start/2]).
-export([prep_stop/1]).
-export([stop/1]).
%% Supervisor callbacks
@ -37,14 +36,6 @@
start(_StartType, _StartArgs) ->
token_keeper:start_link().
-spec prep_stop(State) -> State.
prep_stop(State) ->
% NOTE
% We have to do it in this magic `prep_stop/1` here because for some inexplicable reason the
% usual `stop/1` callback doesn't get called in common_test runs.
ok = tk_audit_log:stop(genlib_app:env(?MODULE, audit, #{})),
State.
-spec stop(any()) -> ok.
stop(_State) ->
ok.
@ -59,7 +50,7 @@ start_link() ->
-spec init(Args :: term()) -> genlib_gen:supervisor_ret().
init([]) ->
AuditPulse = tk_audit_log:init(genlib_app:env(?MODULE, audit, #{})),
{AuditChildSpecs, AuditPulse} = get_audit_specs(),
ServiceOpts = genlib_app:env(?MODULE, services, #{}),
EventHandlers = genlib_app:env(?MODULE, woody_event_handlers, [woody_event_handler_default]),
Healthcheck = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})),
@ -81,7 +72,7 @@ init([]) ->
{ok,
{
#{strategy => one_for_all, intensity => 6, period => 30},
[HandlerChildSpec, TokensChildSpec]
[HandlerChildSpec, TokensChildSpec | AuditChildSpecs]
}}.
-spec get_ip_address() -> inet:ip_address().
@ -123,6 +114,17 @@ get_handler_specs(ServiceOpts, AuditPulse) ->
}
].
-spec get_audit_specs() -> {[supervisor:child_spec()], tk_pulse:handlers()}.
get_audit_specs() ->
Opts = genlib_app:env(?MODULE, audit, #{}),
case maps:get(log, Opts, #{}) of
LogOpts = #{} ->
{ok, ChildSpec, Pulse} = tk_audit_log:child_spec(LogOpts),
{[ChildSpec], [Pulse]};
disable ->
{[], []}
end.
%%
-spec enable_health_logging(erl_health:check()) -> erl_health:check().