DC-116: optimize memory consumption (#26)

This commit is contained in:
Sergei 2019-04-25 12:43:46 +03:00 committed by GitHub
parent 97fdfe552c
commit 635dc0fb19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 364 additions and 88 deletions

2
.gitignore vendored
View File

@ -1,6 +1,8 @@
# general
log
/_build/
/.rebar3/
/ebin/
*~
erl_crash.dump
.tags*

View File

@ -10,6 +10,7 @@
elements => 20,
memory => 52428800 % 50Mb
}},
{cache_server_call_timeout, 10000},
{service_urls, #{
'Repository' => <<"dominant:8022/v1/domain/repository">>,
'RepositoryClient' => <<"dominant:8022/v1/domain/repository_client">>

View File

@ -8,7 +8,7 @@
0},
{<<"dmt_core">>,
{git,"git@github.com:rbkmoney/dmt_core.git",
{ref,"045c78132ecce5a8ec4a2e6ccd2c6b0b65bade1f"}},
{ref,"357066d8be36ce1032d2d6c0d4cb31eb50730335"}},
0},
{<<"genlib">>,
{git,"https://github.com/rbkmoney/genlib.git",

View File

@ -12,6 +12,7 @@
-export([checkout_object/3]).
-export([commit/2]).
-export([commit/3]).
-export([get_last_version/0]).
-export([pull_range/2]).
-export([pull_range/3]).
@ -28,6 +29,8 @@
-export_type([snapshot/0]).
-export_type([commit/0]).
-export_type([object_ref/0]).
-export_type([domain_object/0]).
-export_type([domain/0]).
-export_type([history/0]).
-export_type([transport_opts/0]).
@ -39,10 +42,12 @@
-type snapshot() :: dmsl_domain_config_thrift:'Snapshot'().
-type commit() :: dmsl_domain_config_thrift:'Commit'().
-type object_ref() :: dmsl_domain_thrift:'Reference'().
-type domain_object() :: dmsl_domain_thrift:'DomainObject'().
-type domain() :: dmsl_domain_thrift:'Domain'().
-type history() :: dmsl_domain_config_thrift:'History'().
-type transport_opts() :: woody_client_thrift_http_transport:options() | undefined.
%% API
%%% API
-spec checkout(ref()) ->
snapshot() | no_return().
@ -54,17 +59,12 @@ checkout(Reference) ->
snapshot() | no_return().
checkout(Reference, Opts) ->
CacheResult = case Reference of
{head, #'Head'{}} ->
dmt_client_cache:get_latest();
{version, Version} ->
dmt_client_cache:get(Version)
end,
case CacheResult of
Version = ref_to_version(Reference),
case dmt_client_cache:get(Version, Opts) of
{ok, Snapshot} ->
Snapshot;
{error, version_not_found} ->
dmt_client_cache:put(dmt_client_api:checkout(Reference, Opts))
{error, Error} ->
erlang:error(Error)
end.
-spec checkout_object(ref(), object_ref()) ->
@ -77,12 +77,14 @@ checkout_object(Reference, ObjectReference) ->
dmsl_domain_config_thrift:'VersionedObject'() | no_return().
checkout_object(Reference, ObjectReference, Opts) ->
#'Snapshot'{version = Version, domain = Domain} = checkout(Reference, Opts),
case dmt_domain:get_object(ObjectReference, Domain) of
Version = ref_to_version(Reference),
case dmt_client_cache:get_object(Version, ObjectReference, Opts) of
{ok, Object} ->
#'VersionedObject'{version = Version, object = Object};
error ->
throw(#'ObjectNotFound'{})
{error, {woody_error, _} = Error} ->
erlang:error(Error);
{error, _} ->
erlang:throw(#'ObjectNotFound'{})
end.
-spec commit(version(), commit()) ->
@ -97,6 +99,12 @@ commit(Version, Commit) ->
commit(Version, Commit, Opts) ->
dmt_client_api:commit(Version, Commit, Opts).
-spec get_last_version() ->
version().
get_last_version() ->
dmt_client_cache:get_last_version().
-spec pull_range(version(), limit()) ->
history() | no_return().
@ -109,7 +117,7 @@ pull_range(Version, Limit) ->
pull_range(Version, Limit, Opts) ->
dmt_client_api:pull_range(Version, Limit, Opts).
%% Supervisor callbacks
%%% Supervisor callbacks
-spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
@ -117,7 +125,7 @@ init([]) ->
Cache = #{id => dmt_client_cache, start => {dmt_client_cache, start_link, []}, restart => permanent},
{ok, {#{strategy => one_for_one, intensity => 10, period => 60}, [Cache]}}.
%% Application callbacks
%%% Application callbacks
-spec start(normal, any()) -> {ok, pid()} | {error, any()}.
@ -128,3 +136,13 @@ start(_StartType, _StartArgs) ->
stop(_State) ->
ok.
%%% Internal functions
-spec ref_to_version(ref()) ->
version().
ref_to_version({version, Version}) ->
Version;
ref_to_version({head, #'Head'{}}) ->
dmt_client_cache:get_last_version().

View File

@ -1,16 +1,18 @@
-module(dmt_client_cache).
-behaviour(gen_server).
%%
%% API
-export([start_link/0]).
-export([put/1]).
-export([get/1]).
-export([get_latest/0]).
-export([get/2]).
-export([get_last_version/0]).
-export([get_object/2]).
-export([get_object/3]).
-export([update/0]).
%%
%% gen_server callbacks
-export([init/1]).
-export([handle_call/3]).
@ -23,44 +25,109 @@
-define(SERVER, ?MODULE).
-define(DEFAULT_INTERVAL, 5000).
-define(DEFAULT_LIMIT, 10).
-define(DEFAULT_CALL_TIMEOUT, 10000).
-include_lib("dmsl/include/dmsl_domain_config_thrift.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%%
-define(meta_table_opts, [
named_table,
ordered_set,
protected,
{read_concurrency, true},
{write_concurrency, false},
{keypos, #snap.vsn}
]).
-spec start_link() -> {ok, pid()} | {error, term()}. % FIXME
-define(snapshot_table_opts, [
set,
protected,
{read_concurrency, true},
{write_concurrency, false},
{keypos, #object.ref}
]).
-record(snap, {
vsn :: dmt_client:version(),
tid :: ets:tid()
}).
-type snap() :: #snap{}.
-record(object, {
ref :: dmt_client:object_ref(),
obj :: dmt_client:domain_object()
}).
-type woody_error() :: {woody_error, woody_error:system_error()}.
%%% API
-spec start_link() ->
{ok, pid()} | {error, {already_started, pid()}}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec put(dmt_client:snapshot()) -> dmt_client:snapshot().
put(Snapshot) ->
ok = gen_server:cast(?SERVER, {put, Snapshot}),
Snapshot.
-spec get(dmt_client:version()) -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
-spec get(dmt_client:version()) ->
{ok, dmt_client:snapshot()} | {error, version_not_found | woody_error()}.
get(Version) ->
get_snapshot(Version).
get(Version, undefined).
-spec get_latest() -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
-spec get(dmt_client:version(), dmt_client:transport_opts()) ->
{ok, dmt_client:snapshot()} | {error, version_not_found | woody_error()}.
get_latest() ->
case latest_snapshot() of
{ok, Snapshot} ->
{ok, Snapshot};
get(Version, Opts) ->
case get_snapshot(Version) of
{ok, _Snapshot} = Result ->
Result;
{error, version_not_found} ->
gen_server:call(?SERVER, get_latest)
call({get_snapshot, Version, Opts})
end.
-spec update() -> {ok, dmt_client:version()} | {error, term()}.
-spec get_object(dmt_client:version(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found | woody_error()}.
get_object(Version, ObjectRef) ->
get_object(Version, ObjectRef, undefined).
-spec get_object(dmt_client:version(), dmt_client:object_ref(), dmt_client:transport_opts()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found | woody_error()}.
get_object(Version, ObjectRef, Opts) ->
case do_get_object(Version, ObjectRef) of
{ok, _Object} = Result ->
Result;
{error, version_not_found} ->
call({get_object, Version, ObjectRef, Opts});
{error, object_not_found} = NotFound ->
NotFound
end.
-spec get_last_version() ->
dmt_client:version() | no_return().
get_last_version() ->
case do_get_last_version() of
{ok, Version} ->
Version;
{error, version_not_found} ->
case update() of
{ok, Version} ->
Version;
{error, Error} ->
erlang:error(Error)
end
end.
-spec update() ->
{ok, dmt_client:version()} | {error, woody_error()}.
update() ->
gen_server:call(?SERVER, update).
call(update).
%%
%%% gen_server callbacks
-record(state, {
timer = undefined :: undefined | reference()
@ -68,40 +135,38 @@ update() ->
-type state() :: #state{}.
-spec init(_) -> {ok, state(), 0}.
-spec init(_) ->
{ok, state(), 0}.
init(_) ->
EtsOpts = [
named_table,
ordered_set,
protected,
{read_concurrency, true},
{keypos, #'Snapshot'.version}
],
?TABLE = ets:new(?TABLE, EtsOpts),
?TABLE = ets:new(?TABLE, ?meta_table_opts),
{ok, #state{}, 0}.
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
-spec handle_call(term(), {pid(), term()}, state()) ->
{reply, term(), state()}.
handle_call({get_object, Version, ObjectRef, Opts}, _From, State) ->
Result = get_object_internal(Version, ObjectRef, Opts),
{reply, Result, State};
handle_call(update, _From, State) ->
{reply, update_cache(), restart_timer(State)};
handle_call(get_latest, _From, State) ->
{reply, latest_snapshot(), State};
handle_call({get_snapshot, Version, Opts}, _From, State) ->
Result = get_snapshot_internal(Version, Opts),
{reply, Result, State};
handle_call(_Msg, _From, State) ->
{noreply, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast({put, Snapshot}, State) ->
ok = put_snapshot(Snapshot),
{noreply, State};
-spec handle_cast(term(), state()) ->
{noreply, state()}.
handle_cast(_Msg, State) ->
{noreply, State}.
-spec handle_info(term(), state()) -> {noreply, state()}.
-spec handle_info(term(), state()) ->
{noreply, state()}.
handle_info(timeout, State) ->
_Result = update_cache(),
@ -110,43 +175,208 @@ handle_info(timeout, State) ->
handle_info(_Msg, State) ->
{noreply, State}.
-spec terminate(term(), state()) -> ok.
-spec terminate(term(), state()) ->
ok.
terminate(_Reason, _State) ->
ok.
-spec code_change(term(), state(), term()) -> {error, noimpl}.
code_change(_OldVsn, _State, _Extra) ->
{error, noimpl}.
-spec code_change(term(), state(), term()) ->
{ok, state()}.
%% internal
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-spec put_snapshot(dmt_client:snapshot()) -> ok.
%%% Internal functions
put_snapshot(Snapshot) ->
true = ets:insert(?TABLE, Snapshot),
cleanup().
-spec call(term()) ->
term().
-spec get_snapshot(dmt_client:version()) -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
call(Msg) ->
DefTimeout = application:get_env(dmt_client, cache_server_call_timeout, ?DEFAULT_CALL_TIMEOUT),
call(Msg, DefTimeout).
-spec call(term(), timeout()) ->
term().
call(Msg, Timeout) ->
try
gen_server:call(?SERVER, Msg, Timeout)
catch
exit:{timeout, {gen_server, call, _}} ->
woody_error:raise(system, {external, resource_unavailable, <<"dmt_client_cache timeout">>})
end.
-spec put_snapshot(dmt_client:snapshot()) ->
ok.
put_snapshot(#'Snapshot'{version = Version, domain = Domain}) ->
case get_snap(Version) of
{ok, _Snap} ->
ok;
{error, version_not_found} ->
TID = ets:new(?MODULE, ?snapshot_table_opts),
true = put_domain_to_table(TID, Domain),
Snap = #snap{vsn = Version, tid = TID},
true = ets:insert(?TABLE, Snap),
cleanup()
end.
-spec put_domain_to_table(ets:tid(), dmt_client:domain()) ->
true.
put_domain_to_table(TID, Domain) ->
dmt_domain:fold(
fun(Ref, Object, _) ->
true = ets:insert(TID, #object{ref = Ref, obj = Object})
end,
true,
Domain
).
-spec get_snapshot(dmt_client:version()) ->
{ok, dmt_client:snapshot()} | {error, version_not_found}.
get_snapshot(Version) ->
case get_snap(Version) of
{ok, Snap} ->
do_get_snapshot(Snap);
{error, version_not_found} = Error ->
Error
end.
-spec get_snap(dmt_client:version()) ->
{ok, snap()} | {error, version_not_found}.
get_snap(Version) ->
case ets:lookup(?TABLE, Version) of
[Snapshot] ->
{ok, Snapshot};
[Snap] ->
{ok, Snap};
[] ->
{error, version_not_found}
end.
-spec latest_snapshot() -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
-spec do_get_object(dmt_client:version(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found}.
do_get_object(Version, ObjectRef) ->
case get_snap(Version) of
{ok, Snap} ->
get_object_by_snap(Snap, ObjectRef);
{error, version_not_found} = Error ->
Error
end.
-spec get_object_by_snap(snap(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found}.
get_object_by_snap(#snap{tid = TID}, ObjectRef) ->
try ets:lookup(TID, ObjectRef) of
[#object{obj = Object}] ->
{ok, Object};
[] ->
{error, object_not_found}
catch
error:badarg -> % table was deleted
{error, version_not_found}
end.
-spec get_object_internal(dmt_client:version(), dmt_client:object_ref(), dmt_client:transport_opts()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found | woody_error()}.
get_object_internal(Version, ObjectRef, Opts) ->
try
case do_get_object(Version, ObjectRef) of
{ok, _Object} = Result ->
Result;
{error, version_not_found} ->
Snapshot = dmt_client_api:checkout({version, Version}, Opts),
ok = put_snapshot(Snapshot),
get_object_from_snapshot(ObjectRef, Snapshot);
{error, object_not_found} = NotFound ->
NotFound
end
catch
throw:#'VersionNotFound'{} ->
{error, version_not_found};
error:{woody_error, {_Source, _Class, _Details}} = Error ->
{error, Error}
end.
-spec get_object_from_snapshot(dmt_client:object_ref(), dmt_client:snapshot()) ->
{ok, dmt_client:domain_object()} | {error, object_not_found}.
get_object_from_snapshot(ObjectRef, #'Snapshot'{domain = Domain}) ->
case dmt_domain:get_object(ObjectRef, Domain) of
{ok, _Object} = Result ->
Result;
error ->
{error, object_not_found}
end.
-spec get_snapshot_internal(dmt_client:version(), dmt_client:transport_opts()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | woody_error()}.
get_snapshot_internal(Version, Opts) ->
try
case get_snapshot(Version) of
{ok, _Snapshot} = Result ->
Result;
{error, version_not_found} ->
Snapshot = dmt_client_api:checkout({version, Version}, Opts),
ok = put_snapshot(Snapshot),
{ok, Snapshot}
end
catch
throw:#'VersionNotFound'{} ->
{error, version_not_found};
error:{woody_error, {_Source, _Class, _Details}} = Error ->
{error, Error}
end.
-spec do_get_snapshot(snap()) ->
{ok, dmt_client:snapshot()} | {error, version_not_found}.
do_get_snapshot(#snap{vsn = Version, tid = TID}) ->
try
Domain = ets:foldl(
fun(#object{obj = Object}, Domain) ->
{ok, NewDomain} = dmt_domain:insert(Object, Domain),
NewDomain
end,
dmt_domain:new(),
TID
),
{ok, #'Snapshot'{version = Version, domain = Domain}}
catch
error:badarg -> % table was deleted due to cleanup process or crash
{error, version_not_found}
end.
-spec latest_snapshot() ->
{ok, dmt_client:snapshot()} | {error, version_not_found}.
latest_snapshot() ->
case do_get_last_version() of
{ok, Version} ->
get_snapshot(Version);
{error, version_not_found} = Error ->
Error
end.
-spec do_get_last_version() ->
{ok, dmt_client:version()} | {error, version_not_found}.
do_get_last_version() ->
case ets:last(?TABLE) of
'$end_of_table' ->
{error, version_not_found};
Version ->
get_snapshot(Version)
{ok, Version}
end.
-spec restart_timer(state()) -> state().
-spec restart_timer(state()) ->
state().
restart_timer(State = #state{timer = undefined}) ->
start_timer(State);
@ -155,13 +385,15 @@ restart_timer(State = #state{timer = TimerRef}) ->
_ = erlang:cancel_timer(TimerRef),
start_timer(State#state{timer = undefined}).
-spec start_timer(state()) -> state().
-spec start_timer(state()) ->
state().
start_timer(State = #state{timer = undefined}) ->
Interval = genlib_app:env(dmt_client, cache_update_interval, ?DEFAULT_INTERVAL),
State#state{timer = erlang:send_after(Interval, self(), timeout)}.
-spec update_cache() -> {ok, dmt_client:version()} | {error, term()}.
-spec update_cache() ->
{ok, dmt_client:version()} | {error, term()}.
update_cache() ->
try
@ -177,21 +409,19 @@ update_cache() ->
ok = put_snapshot(NewHead),
{ok, NewHead#'Snapshot'.version}
catch
error:{woody_error, {_Source, Class, _Details}} = Error when
Class == resource_unavailable;
Class == result_unknown
->
error:{woody_error, {_Source, _Class, _Details}} = Error ->
{error, Error}
end.
-spec cleanup() -> ok.
-spec cleanup() ->
ok.
cleanup() ->
{Elements, Memory} = get_cache_size(),
CacheLimits = genlib_app:env(dmt_client, max_cache_size),
MaxElements = genlib_map:get(elements, CacheLimits, 20),
MaxMemory = genlib_map:get(memory, CacheLimits, 52428800), % 50Mb by default
case Elements > MaxElements orelse Memory > MaxMemory of
case Elements > MaxElements orelse (Elements > 1 andalso Memory > MaxMemory) of
true ->
ok = remove_earliest(),
cleanup();
@ -199,23 +429,48 @@ cleanup() ->
ok
end.
-spec get_cache_size() -> {non_neg_integer(), non_neg_integer()}.
-spec get_cache_size() ->
{non_neg_integer(), non_neg_integer()}.
get_cache_size() ->
WordSize = erlang:system_info(wordsize),
Info = ets:info(?TABLE),
{proplists:get_value(size, Info), WordSize * proplists:get_value(memory, Info)}.
Info = ets:info(?TABLE),
Words = get_snapshot_tables_size(),
{proplists:get_value(size, Info), WordSize * Words}.
-spec remove_earliest() -> ok.
-spec get_snapshot_tables_size() ->
non_neg_integer().
get_snapshot_tables_size() ->
ets:foldl(
fun(#snap{tid = TID}, Words) ->
Words + ets_memory(TID)
end,
0,
?TABLE
).
-spec ets_memory(ets:tid()) ->
non_neg_integer().
ets_memory(TID) ->
Info = ets:info(TID),
proplists:get_value(memory, Info).
-spec remove_earliest() ->
ok.
remove_earliest() ->
% Naive implementation, but probably good enough
remove_earliest(ets:first(?TABLE)).
-spec remove_earliest('$end_of_table' | dmt_client:version()) -> ok.
-spec remove_earliest('$end_of_table' | dmt_client:version()) ->
ok.
remove_earliest('$end_of_table') ->
ok;
remove_earliest(Key) ->
true = ets:delete(?TABLE, Key),
remove_earliest(Version) ->
{ok, #snap{tid = TID}} = get_snap(Version),
true = ets:delete(?TABLE, Version),
true = ets:delete(TID),
ok.