mirror of
https://github.com/valitydev/dmt-client.git
synced 2024-11-06 01:15:22 +00:00
feat: Add time-based snap-users table usage
This commit is contained in:
parent
b3b5c30828
commit
f852c7595c
@ -64,6 +64,7 @@
|
||||
-record(snap, {
|
||||
vsn :: dmt_client:vsn(),
|
||||
tid :: ets:tid(),
|
||||
cached_at :: timestamp(),
|
||||
last_access :: timestamp()
|
||||
}).
|
||||
|
||||
@ -76,7 +77,7 @@
|
||||
|
||||
-record(user, {
|
||||
vsn :: dmt_client:vsn(),
|
||||
%% request_time :: timestamp(),
|
||||
requested_at :: timestamp(),
|
||||
pid :: pid()
|
||||
}).
|
||||
|
||||
@ -168,12 +169,12 @@ init(_) ->
|
||||
{ok, restart_cleanup_timer(#state{})}.
|
||||
|
||||
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
|
||||
handle_call({fetch_version, Version, Opts}, From, State) ->
|
||||
handle_call({fetch_version, Version, Timestamp, Opts}, From, State) ->
|
||||
case ets:member(?TABLE, Version) of
|
||||
true ->
|
||||
{reply, {ok, Version}, State};
|
||||
false ->
|
||||
{noreply, fetch_by_reference({version, Version}, From, Opts, State)}
|
||||
{noreply, fetch_by_reference({version, Version}, From, Timestamp, Opts, State)}
|
||||
end;
|
||||
handle_call(update, From, State) ->
|
||||
{noreply, update(From, State)};
|
||||
@ -184,7 +185,7 @@ handle_call(_Msg, _From, State) ->
|
||||
handle_cast({dispatch, Reference, Result}, #state{waiters = Waiters} = State) ->
|
||||
VersionWaiters = maps:get(Reference, Waiters, []),
|
||||
add_users(Reference, VersionWaiters),
|
||||
_ = [DispatchFun(From, Result) || {From, DispatchFun} <- VersionWaiters],
|
||||
_ = [DispatchFun(From, Result) || {From, _TS, DispatchFun} <- VersionWaiters],
|
||||
{noreply, State#state{waiters = maps:remove(Reference, Waiters)}};
|
||||
handle_cast(update, State) ->
|
||||
{noreply, update(undefined, State)};
|
||||
@ -214,7 +215,10 @@ code_change(_OldVsn, State, _Extra) ->
|
||||
add_users({head, _}, _Waiters) ->
|
||||
ok;
|
||||
add_users({version, Version}, Waiters) ->
|
||||
[ets:insert(?USERS_TABLE, #user{vsn = Version, pid = Pid}) || {{Pid, _}, _} <- Waiters],
|
||||
[
|
||||
ets:insert(?USERS_TABLE, #user{vsn = Version, requested_at = TS, pid = Pid})
|
||||
|| {{Pid, TS, _}, _} <- Waiters, TS /= undefined
|
||||
],
|
||||
ok.
|
||||
|
||||
-spec create_tables() -> ok.
|
||||
@ -242,10 +246,11 @@ cast(Msg) ->
|
||||
gen_server:cast(?SERVER, Msg).
|
||||
|
||||
with_version(Version, Opts, Fun) ->
|
||||
TS = timestamp(),
|
||||
Result =
|
||||
case ets:member(?TABLE, Version) of
|
||||
true -> {ok, Version};
|
||||
false -> call({fetch_version, Version, Opts})
|
||||
false -> call({fetch_version, Version, TS, Opts})
|
||||
end,
|
||||
case Result of
|
||||
{ok, Version} ->
|
||||
@ -258,7 +263,7 @@ with_version(Version, Opts, Fun) ->
|
||||
erlang:raise(Class, Reason, Stacktrace)
|
||||
after
|
||||
%% Notify that we finished working on ets copy for further possible cleanup
|
||||
unlock_version(Version)
|
||||
unlock_version(Version, TS)
|
||||
end;
|
||||
Error ->
|
||||
Error
|
||||
@ -329,6 +334,7 @@ put_snapshot(#'Snapshot'{version = Version, domain = Domain}) ->
|
||||
Snap = #snap{
|
||||
vsn = Version,
|
||||
tid = TID,
|
||||
cached_at = timestamp(),
|
||||
last_access = timestamp()
|
||||
},
|
||||
true = ets:insert(?TABLE, Snap),
|
||||
@ -360,16 +366,22 @@ get_all_snaps() ->
|
||||
ets:tab2list(?TABLE).
|
||||
|
||||
update(From, State) ->
|
||||
restart_update_timer(fetch_by_reference({head, #'Head'{}}, From, undefined, State)).
|
||||
restart_update_timer(fetch_by_reference({head, #'Head'{}}, From, undefined, undefined, State)).
|
||||
|
||||
fetch_by_reference(Reference, From, Opts, #state{waiters = Waiters} = State) ->
|
||||
fetch_by_reference(Reference, From, Timestamp, Opts, #state{waiters = Waiters} = State) ->
|
||||
DispatchFun = fun dispatch_reply/2,
|
||||
NewWaiters = maybe_fetch(Reference, From, DispatchFun, Waiters, Opts),
|
||||
NewWaiters = maybe_fetch(Reference, From, Timestamp, DispatchFun, Waiters, Opts),
|
||||
State#state{waiters = NewWaiters}.
|
||||
|
||||
-spec maybe_fetch(dmt_client:ref(), from() | undefined, dispatch_fun(), waiters(), dmt_client:transport_opts()) ->
|
||||
waiters().
|
||||
maybe_fetch(Reference, ReplyTo, DispatchFun, Waiters, Opts) ->
|
||||
-spec maybe_fetch(
|
||||
dmt_client:ref(),
|
||||
from() | undefined,
|
||||
timestamp(),
|
||||
dispatch_fun(),
|
||||
waiters(),
|
||||
dmt_client:transport_opts()
|
||||
) -> waiters().
|
||||
maybe_fetch(Reference, ReplyTo, Timestamp, DispatchFun, Waiters, Opts) ->
|
||||
Prev =
|
||||
case maps:find(Reference, Waiters) of
|
||||
error ->
|
||||
@ -378,7 +390,7 @@ maybe_fetch(Reference, ReplyTo, DispatchFun, Waiters, Opts) ->
|
||||
{ok, List} ->
|
||||
List
|
||||
end,
|
||||
Waiters#{Reference => [{ReplyTo, DispatchFun} | Prev]}.
|
||||
Waiters#{Reference => [{ReplyTo, Timestamp, DispatchFun} | Prev]}.
|
||||
|
||||
-spec schedule_fetch(dmt_client:ref(), dmt_client:transport_opts()) -> pid().
|
||||
schedule_fetch(Reference, Opts) ->
|
||||
@ -489,8 +501,8 @@ cancel_timer(undefined) ->
|
||||
cancel_timer(TimerRef) ->
|
||||
erlang:cancel_timer(TimerRef).
|
||||
|
||||
unlock_version(Version) ->
|
||||
ets:delete_object(?USERS_TABLE, #user{vsn = Version, pid = self()}).
|
||||
unlock_version(Version, TS) ->
|
||||
ets:delete_object(?USERS_TABLE, #user{vsn = Version, requested_at = TS, pid = self()}).
|
||||
|
||||
-spec cleanup() -> ok.
|
||||
cleanup() ->
|
||||
@ -544,8 +556,15 @@ ets_memory(TID) ->
|
||||
-spec remove_earliest([snap()], dmt_client:vsn()) -> [snap()].
|
||||
remove_earliest([#snap{vsn = HeadVersion} | Tail], HeadVersion) ->
|
||||
Tail;
|
||||
remove_earliest([Snap = #snap{vsn = Version} | Tail], HeadVersion) ->
|
||||
case ets:select_count(?USERS_TABLE, [{#user{vsn = Version, pid = '_'}, [], ['$_']}]) of
|
||||
remove_earliest([Snap = #snap{vsn = Version, cached_at = StoredAt} | Tail], HeadVersion) ->
|
||||
MatchSpec = [
|
||||
{
|
||||
#user{vsn = Version, pid = '_', requested_at = '$1'},
|
||||
[{'<', '$1', StoredAt}],
|
||||
['$_']
|
||||
}
|
||||
],
|
||||
case ets:select_count(?USERS_TABLE, MatchSpec) of
|
||||
0 ->
|
||||
remove_snap(Snap),
|
||||
Tail;
|
||||
@ -556,7 +575,7 @@ remove_earliest([Snap = #snap{vsn = Version} | Tail], HeadVersion) ->
|
||||
-spec remove_snap(snap()) -> ok.
|
||||
remove_snap(#snap{tid = TID, vsn = Version}) ->
|
||||
true = ets:delete(?TABLE, Version),
|
||||
_ = ets:select_delete(?USERS_TABLE, [{#user{vsn = Version, pid = '_'}, [], ['$_']}]),
|
||||
_ = ets:select_delete(?USERS_TABLE, [{#user{vsn = Version, requested_at = '_', pid = '_'}, [], ['$_']}]),
|
||||
true = ets:delete(TID),
|
||||
ok.
|
||||
|
||||
|
@ -63,7 +63,9 @@ poll(_C) ->
|
||||
_ = dmt_client_cache:update(),
|
||||
#'Snapshot'{version = Version2} = dmt_client:get_snapshot(latest),
|
||||
Object = dmt_client:get(latest, Ref),
|
||||
#'VersionedObject'{version = Version2, object = Object} = dmt_client:get_versioned(latest, Ref).
|
||||
#'VersionedObject'{version = Version2, object = Object} = dmt_client:get_versioned(latest, Ref),
|
||||
timer:sleep(5000),
|
||||
erlang:display(ets:tab2list(dmt_client_cache_users)).
|
||||
|
||||
fixture_domain_object(Ref, Data) ->
|
||||
{category, #'domain_CategoryObject'{
|
||||
|
Loading…
Reference in New Issue
Block a user