feat: Switch to ets-based lock version

This commit is contained in:
Yaroslav Rogov 2021-06-22 15:38:10 +03:00
parent d5242d7a2f
commit 8a224b81f8
No known key found for this signature in database
GPG Key ID: 5159F2A85653816B

View File

@ -30,7 +30,6 @@
-define(DEFAULT_CALL_TIMEOUT, 10000).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-define(meta_table_opts, [
named_table,
@ -49,6 +48,17 @@
{keypos, #object.ref}
]).
-define(USERS_TABLE, dmt_client_cache_users).
-define(users_table_opts, [
named_table,
bag,
public,
{read_concurrency, false},
{write_concurrency, true},
{keypos, #user.vsn}
]).
-type timestamp() :: integer().
-record(snap, {
@ -64,6 +74,12 @@
obj :: dmt_client:domain_object()
}).
-record(user, {
vsn :: dmt_client:vsn(),
%% request_time :: timestamp(),
pid :: pid()
}).
-type woody_error() :: {woody_error, woody_error:system_error()}.
-type from() :: {pid(), term()}.
@ -73,15 +89,10 @@
dmt_client:ref() => [{from() | undefined, dispatch_fun()}]
}.
-type users() :: #{
dmt_client:vsn() => [pid()]
}.
-record(state, {
update_timer = undefined :: undefined | reference(),
cleanup_timer = undefined :: undefined | reference(),
waiters = #{} :: waiters(),
users = #{} :: users()
waiters = #{} :: waiters()
}).
-type state() :: #state{}.
@ -170,30 +181,11 @@ handle_call(_Msg, _From, State) ->
{noreply, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast({dispatch, Reference, Result}, #state{waiters = Waiters, users = Users} = State) ->
handle_cast({dispatch, Reference, Result}, #state{waiters = Waiters} = State) ->
VersionWaiters = maps:get(Reference, Waiters, []),
add_users(Reference, VersionWaiters),
_ = [DispatchFun(From, Result) || {From, DispatchFun} <- VersionWaiters],
NewUsers =
case Result of
{ok, Version} ->
add_users(Version, VersionWaiters, Users);
_ ->
Users
end,
{noreply, State#state{waiters = maps:remove(Reference, Waiters), users = NewUsers}};
handle_cast({unlock, Version, Pid}, State = #state{users = Users}) ->
NewUsers =
case Users of
#{Version := VerUserSet} ->
NewVerUserSet = sets:del_element(Pid, VerUserSet),
case sets:size(NewVerUserSet) of
0 -> maps:remove(Version, Users);
_ -> Users#{Version => NewVerUserSet}
end;
_ ->
Users
end,
{noreply, State#state{users = NewUsers}};
{noreply, State#state{waiters = maps:remove(Reference, Waiters)}};
handle_cast(update, State) ->
{noreply, update(undefined, State)};
handle_cast(_Msg, State) ->
@ -203,7 +195,7 @@ handle_cast(_Msg, State) ->
handle_info({update_timer, timeout}, State) ->
{noreply, update(undefined, State)};
handle_info({cleanup_timer, timeout}, State) ->
cleanup(State),
cleanup(),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
@ -218,18 +210,17 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
add_users(Version, Waiters, Users) ->
InitUsers =
case maps:find(Version, Users) of
error -> sets:new();
{ok, IUs} -> IUs
end,
Pids = [Pid || {{Pid, _}, _} <- Waiters],
Users#{Version => sets:union(InitUsers, sets:from_list(Pids))}.
%% update op: nothing to do
add_users({head, _}, _Waiters) ->
ok;
add_users({version, Version}, Waiters) ->
[ets:insert(?USERS_TABLE, #user{vsn = Version, pid = Pid}) || {{Pid, _}, _} <- Waiters],
ok.
-spec create_tables() -> ok.
create_tables() ->
?TABLE = ets:new(?TABLE, ?meta_table_opts),
?USERS_TABLE = ets:new(?USERS_TABLE, ?users_table_opts),
ok.
-spec call(term()) -> term().
@ -266,8 +257,8 @@ with_version(Version, Opts, Fun) ->
Class:Reason:Stacktrace ->
erlang:raise(Class, Reason, Stacktrace)
after
%% Notify server that we finished working on ets copy for further possible cleanup
cast({unlock, Version, self()})
%% Notify that we finished working on ets copy for further possible cleanup
unlock_version(Version)
end;
Error ->
Error
@ -302,7 +293,7 @@ do_get(Version, ObjectRef) ->
do_get_by_type(Version, ObjectType) ->
{ok, #snap{tid = TID}} = get_snap(Version),
MatchSpec = [
{{object, '_', {ObjectType, '$1'}}, [], ['$1']}
{#object{ref = '_', obj = {ObjectType, '$1'}}, [], ['$1']}
],
try ets:select(TID, MatchSpec) of
Result ->
@ -498,19 +489,22 @@ cancel_timer(undefined) ->
cancel_timer(TimerRef) ->
erlang:cancel_timer(TimerRef).
-spec cleanup(state()) -> ok.
cleanup(#state{users = Users}) ->
unlock_version(Version) ->
ets:delete_object(?USERS_TABLE, #user{vsn = Version, pid = self()}).
-spec cleanup() -> ok.
cleanup() ->
Snaps = get_all_snaps(),
Sorted = lists:keysort(#snap.last_access, Snaps),
case do_get_last_version() of
{ok, HeadVersion} -> cleanup(Sorted, HeadVersion, Users);
{ok, HeadVersion} -> cleanup(Sorted, HeadVersion);
_ -> ok
end.
-spec cleanup([snap()], dmt_client:vsn(), users()) -> ok.
cleanup([], _HeadVersion, _Users) ->
-spec cleanup([snap()], dmt_client:vsn()) -> ok.
cleanup([], _HeadVersion) ->
ok;
cleanup(Snaps, HeadVersion, Users) ->
cleanup(Snaps, HeadVersion) ->
{Elements, Memory} = get_cache_size(),
CacheLimits = genlib_app:env(dmt_client, max_cache_size),
MaxElements = genlib_map:get(elements, CacheLimits, 20),
@ -519,8 +513,8 @@ cleanup(Snaps, HeadVersion, Users) ->
case Elements > MaxElements orelse (Elements > 1 andalso Memory > MaxMemory) of
true ->
Tail = remove_earliest(Snaps, HeadVersion, Users),
cleanup(Tail, HeadVersion, Users);
Tail = remove_earliest(Snaps, HeadVersion),
cleanup(Tail, HeadVersion);
false ->
ok
end.
@ -547,21 +541,22 @@ ets_memory(TID) ->
Info = ets:info(TID),
proplists:get_value(memory, Info).
-spec remove_earliest([snap()], dmt_client:vsn(), users()) -> [snap()].
remove_earliest([#snap{vsn = HeadVersion} | Tail], HeadVersion, _) ->
-spec remove_earliest([snap()], dmt_client:vsn()) -> [snap()].
remove_earliest([#snap{vsn = HeadVersion} | Tail], HeadVersion) ->
Tail;
remove_earliest([Snap = #snap{vsn = Version} | Tail], HeadVersion, Users) ->
case maps:is_key(Version, Users) of
true ->
[Snap | remove_earliest(Tail, HeadVersion, Users)];
false ->
remove_earliest([Snap = #snap{vsn = Version} | Tail], HeadVersion) ->
case ets:select_count(?USERS_TABLE, [{#user{vsn = Version, pid = '_'}, [], ['$_']}]) of
0 ->
remove_snap(Snap),
Tail
Tail;
_ ->
[Snap | remove_earliest(Tail, HeadVersion)]
end.
-spec remove_snap(snap()) -> ok.
remove_snap(#snap{tid = TID, vsn = Version}) ->
true = ets:delete(?TABLE, Version),
_ = ets:select_delete(?USERS_TABLE, [{#user{vsn = Version, pid = '_'}, [], ['$_']}]),
true = ets:delete(TID),
ok.
@ -613,7 +608,7 @@ test_cleanup() ->
ok = put_snapshot(#'Snapshot'{version = 3, domain = dmt_domain:new()}),
ok = put_snapshot(#'Snapshot'{version = 2, domain = dmt_domain:new()}),
ok = put_snapshot(#'Snapshot'{version = 1, domain = dmt_domain:new()}),
cleanup(#state{}),
cleanup(),
[
#snap{vsn = 1, _ = _},
#snap{vsn = 4, _ = _}
@ -629,7 +624,7 @@ test_last_access() ->
Ref = {category, #'domain_CategoryRef'{id = 1}},
{error, object_not_found} = get(3, Ref, undefined),
ok = put_snapshot(#'Snapshot'{version = 1, domain = dmt_domain:new()}),
cleanup(#state{}),
cleanup(),
[
#snap{vsn = 1, _ = _},
#snap{vsn = 3, _ = _},