Ed 180/feat/extra methods (#44)

* refactor: Switch to ensure_version variant of functions

* refactor: Fix naming and redundant API

* feat: Add get_by_type, filter and fold

* refactor: Generalize fetch_version, remove update

* feat: Add implicit head API, easier ref passing

* refactor: fix linter warning

* refactor: Fix linter for sure

* refactor: remove test-only methods

* refactor: split updating and fetching

* refactor: Fix ref and version type and API args

* fix: Fix cleanup perms issue for cleanup

* test: Fix tests for new API

* feat: deferred usage-based cleanup

* fix: PoC of fixing the stale-head issue

* perf: switch to ordered_set for snapshot tables

* feat: Switch to ets-based lock version

* fix: Fix cleanup interval options

* feat: Add time-based  snap-users table usage

* fix: Fix cleanup for locked snapshots

* fix: Add users-table cleanup

* fix: Improve update_head history check

* fix: Remove stale unlock cast

* fix: Fix cleanup_users for dead processes

* refactor: Fix dialyzer errors

* refactor: Fix recs for ets match specs for dialyze

* refactor: Simplify with_version

* refactor: switch back to checkout_* names

* Revert "refactor: Simplify with_version"

This reverts commit 8e1d925f72.

* Revert "fix: Fix cleanup_users for dead processes"

This reverts commit ff808b00a1.

* Revert "fix: Remove stale unlock cast"

This reverts commit c6f09fd640.

* Revert "fix: Add users-table cleanup"

This reverts commit 5665ee6df2.

* Revert "fix: Fix cleanup for locked snapshots"

This reverts commit c3a843887b.

* Revert "feat: Add time-based  snap-users table usage"

This reverts commit f852c7595c.

* Revert "fix: Fix cleanup interval options"

This reverts commit b3b5c30828.

* Revert "feat: Switch to ets-based lock version"

This reverts commit 8a224b81f8.

* Revert "feat: deferred usage-based cleanup"

This reverts commit cd8eef5d46.

* refactor: Fix init spec

* refactor: Fix cache API semantics

* chore: Mention proposal move to another task

* Revert "fix: Improve update_head history check"

This reverts commit 597d77a690.

* Revert "fix: PoC of fixing the stale-head issue"

This reverts commit 357c772284.
This commit is contained in:
Yaroslav Rogov 2021-06-29 14:28:57 +03:00 committed by GitHub
parent 37f376e239
commit 128f9f193b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 402 additions and 205 deletions

View File

@ -9,8 +9,21 @@
%% API
-export([checkout/1]).
-export([checkout/2]).
-export([checkout_object/1]).
-export([checkout_object/2]).
-export([checkout_object/3]).
-export([checkout_versioned_object/1]).
-export([checkout_versioned_object/2]).
-export([checkout_versioned_object/3]).
-export([checkout_objects_by_type/1]).
-export([checkout_objects_by_type/2]).
-export([checkout_objects_by_type/3]).
-export([checkout_filter_objects/1]).
-export([checkout_filter_objects/2]).
-export([checkout_filter_objects/3]).
-export([checkout_fold_objects/2]).
-export([checkout_fold_objects/3]).
-export([checkout_fold_objects/4]).
-export([commit/2]).
-export([commit/3]).
-export([get_last_version/0]).
@ -28,11 +41,15 @@
-export([stop/1]).
-export_type([ref/0]).
-export_type([vsn/0]).
-export_type([version/0]).
-export_type([limit/0]).
-export_type([snapshot/0]).
-export_type([commit/0]).
-export_type([object_ref/0]).
-export_type([object_type/0]).
-export_type([object_filter/0]).
-export_type([object_folder/1]).
-export_type([domain_object/0]).
-export_type([domain/0]).
-export_type([history/0]).
@ -41,23 +58,28 @@
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type version() :: dmsl_domain_config_thrift:'Version'().
-type vsn() :: dmsl_domain_config_thrift:'Version'().
-type version() :: vsn() | latest.
-type limit() :: dmsl_domain_config_thrift:'Limit'().
-type snapshot() :: dmsl_domain_config_thrift:'Snapshot'().
-type commit() :: dmsl_domain_config_thrift:'Commit'().
-type object_ref() :: dmsl_domain_thrift:'Reference'().
-type object_type() :: atom().
-type object_filter() :: fun((object_type(), domain_object()) -> boolean()).
-type object_folder(Acc) :: fun((object_type(), domain_object(), Acc) -> Acc).
-type domain_object() :: dmsl_domain_thrift:'DomainObject'().
-type versioned_object() :: dmsl_domain_config_thrift:'VersionedObject'().
-type domain() :: dmsl_domain_thrift:'Domain'().
-type history() :: dmsl_domain_config_thrift:'History'().
-type transport_opts() :: woody_client_thrift_http_transport:transport_options() | undefined.
%%% API
-spec checkout(ref()) -> snapshot() | no_return().
-spec checkout(version()) -> snapshot() | no_return().
checkout(Reference) ->
checkout(Reference, undefined).
-spec checkout(ref(), transport_opts()) -> snapshot() | no_return().
-spec checkout(version(), transport_opts()) -> snapshot() | no_return().
checkout(Reference, Opts) ->
Version = ref_to_version(Reference),
case dmt_client_cache:get(Version, Opts) of
@ -67,22 +89,76 @@ checkout(Reference, Opts) ->
erlang:error(Error)
end.
-spec checkout_object(ref(), object_ref()) -> dmsl_domain_config_thrift:'VersionedObject'() | no_return().
-spec checkout_object(object_ref()) -> domain_object() | no_return().
checkout_object(ObjectReference) ->
checkout_object(latest, ObjectReference).
-spec checkout_object(version(), object_ref()) -> domain_object() | no_return().
checkout_object(Reference, ObjectReference) ->
checkout_object(Reference, ObjectReference, undefined).
-spec checkout_object(ref(), object_ref(), transport_opts()) ->
dmsl_domain_config_thrift:'VersionedObject'() | no_return().
-spec checkout_object(version(), object_ref(), transport_opts()) -> domain_object() | no_return().
checkout_object(Reference, ObjectReference, Opts) ->
Version = ref_to_version(Reference),
case dmt_client_cache:get_object(Version, ObjectReference, Opts) of
{ok, Object} ->
#'VersionedObject'{version = Version, object = Object};
{error, {woody_error, _} = Error} ->
erlang:error(Error);
{error, _} ->
erlang:throw(#'ObjectNotFound'{})
end.
unwrap(dmt_client_cache:get_object(Version, ObjectReference, Opts)).
-spec checkout_versioned_object(object_ref()) -> versioned_object() | no_return().
checkout_versioned_object(ObjectReference) ->
checkout_versioned_object(latest, ObjectReference).
-spec checkout_versioned_object(version(), object_ref()) -> versioned_object() | no_return().
checkout_versioned_object(Reference, ObjectReference) ->
checkout_versioned_object(Reference, ObjectReference, undefined).
-spec checkout_versioned_object(version(), object_ref(), transport_opts()) -> versioned_object() | no_return().
checkout_versioned_object(Reference, ObjectReference, Opts) ->
Version = ref_to_version(Reference),
#'VersionedObject'{version = Version, object = checkout_object(Reference, ObjectReference, Opts)}.
-spec checkout_objects_by_type(object_type()) -> [domain_object()] | no_return().
checkout_objects_by_type(ObjectType) ->
checkout_objects_by_type(latest, ObjectType).
-spec checkout_objects_by_type(version(), object_type()) -> [domain_object()] | no_return().
checkout_objects_by_type(Reference, ObjectType) ->
checkout_objects_by_type(Reference, ObjectType, undefined).
-spec checkout_objects_by_type(version(), object_type(), transport_opts()) -> [domain_object()] | no_return().
checkout_objects_by_type(Reference, ObjectType, Opts) ->
Version = ref_to_version(Reference),
unwrap(dmt_client_cache:get_objects_by_type(Version, ObjectType, Opts)).
-spec checkout_filter_objects(object_filter()) -> [{object_type(), domain_object()}] | no_return().
checkout_filter_objects(Filter) ->
checkout_filter_objects(latest, Filter).
-spec checkout_filter_objects(version(), object_filter()) -> [{object_type(), domain_object()}] | no_return().
checkout_filter_objects(Reference, Filter) ->
checkout_filter_objects(Reference, Filter, undefined).
-spec checkout_filter_objects(version(), object_filter(), transport_opts()) ->
[{object_type(), domain_object()}] | no_return().
checkout_filter_objects(Reference, Filter, Opts) ->
Folder = fun(Type, Object, Acc) ->
case Filter(Type, Object) of
true -> [{Type, Object} | Acc];
false -> Acc
end
end,
checkout_fold_objects(Reference, Folder, [], Opts).
-spec checkout_fold_objects(object_folder(Acc), Acc) -> Acc | no_return().
checkout_fold_objects(Folder, Acc) ->
checkout_fold_objects(latest, Folder, Acc).
-spec checkout_fold_objects(version(), object_folder(Acc), Acc) -> Acc | no_return().
checkout_fold_objects(Reference, Folder, Acc) ->
checkout_fold_objects(Reference, Folder, Acc, undefined).
-spec checkout_fold_objects(version(), object_folder(Acc), Acc, transport_opts()) -> Acc | no_return().
checkout_fold_objects(Reference, Folder, Acc, Opts) ->
Version = ref_to_version(Reference),
unwrap(dmt_client_cache:fold_objects(Version, Folder, Acc, Opts)).
-spec commit(version(), commit()) -> version() | no_return().
commit(Version, Commit) ->
@ -135,8 +211,13 @@ stop(_State) ->
%%% Internal functions
-spec ref_to_version(ref()) -> version().
ref_to_version({version, Version}) ->
unwrap({ok, Acc}) -> Acc;
unwrap({error, {woody_error, _} = Error}) -> erlang:error(Error);
%% DISCUSS: shouldn't version_not_found be handled some other way?
unwrap({error, _}) -> erlang:throw(#'ObjectNotFound'{}).
-spec ref_to_version(version()) -> vsn().
ref_to_version(Version) when is_integer(Version) ->
Version;
ref_to_version({head, #'Head'{}}) ->
ref_to_version(latest) ->
dmt_client_cache:get_last_version().

View File

@ -7,8 +7,7 @@
-export([pull_range/3]).
-export([checkout_object/3]).
-spec commit(dmt_client:version(), dmt_client:commit(), dmt_client:transport_opts()) ->
dmt_client:version() | no_return().
-spec commit(dmt_client:vsn(), dmt_client:commit(), dmt_client:transport_opts()) -> dmt_client:vsn() | no_return().
commit(Version, Commit, Opts) ->
call('Repository', 'Commit', {Version, Commit}, Opts).
@ -16,7 +15,7 @@ commit(Version, Commit, Opts) ->
checkout(Reference, Opts) ->
call('Repository', 'Checkout', {Reference}, Opts).
-spec pull_range(dmt_client:version(), dmt_client:limit(), dmt_client:transport_opts()) ->
-spec pull_range(dmt_client:vsn(), dmt_client:limit(), dmt_client:transport_opts()) ->
dmt_client:history() | no_return().
pull_range(After, Limit, Opts) ->
call('Repository', 'PullRange', {After, Limit}, Opts).

View File

@ -7,21 +7,19 @@
%%% Behaviour callbacks
-callback commit(dmt_client:version(), dmt_client:commit(), dmt_client:transport_opts()) ->
dmt_client:version() | no_return().
-callback commit(dmt_client:vsn(), dmt_client:commit(), dmt_client:transport_opts()) -> dmt_client:vsn() | no_return().
-callback checkout(dmt_client:ref(), dmt_client:transport_opts()) -> dmt_client:snapshot() | no_return().
-callback checkout_object(dmt_client:ref(), dmt_client:object_ref(), dmt_client:transport_opts()) ->
dmsl_domain_thrift:'DomainObject'() | no_return().
-callback pull_range(dmt_client:version(), dmt_client:limit(), dmt_client:transport_opts()) ->
-callback pull_range(dmt_client:vsn(), dmt_client:limit(), dmt_client:transport_opts()) ->
dmt_client:history() | no_return().
%%% API
-spec commit(dmt_client:version(), dmt_client:commit(), dmt_client:transport_opts()) ->
dmt_client:version() | no_return().
-spec commit(dmt_client:vsn(), dmt_client:commit(), dmt_client:transport_opts()) -> dmt_client:vsn() | no_return().
commit(Version, Commit, Opts) ->
call(commit, [Version, Commit, Opts]).
@ -34,7 +32,7 @@ checkout(Reference, Opts) ->
checkout_object(Reference, ObjectReference, Opts) ->
call(checkout_object, [Reference, ObjectReference, Opts]).
-spec pull_range(dmt_client:version(), dmt_client:limit(), dmt_client:transport_opts()) ->
-spec pull_range(dmt_client:vsn(), dmt_client:limit(), dmt_client:transport_opts()) ->
dmt_client:history() | no_return().
pull_range(After, Limit, Opts) ->
call(pull_range, [After, Limit, Opts]).

View File

@ -6,11 +6,11 @@
-export([start_link/0]).
-export([get/1]).
-export([get/2]).
-export([get_last_version/0]).
-export([get_object/2]).
-export([get_object/3]).
-export([get_objects_by_type/3]).
-export([fold_objects/4]).
-export([get_last_version/0]).
-export([update/0]).
%% gen_server callbacks
@ -41,7 +41,7 @@
]).
-define(snapshot_table_opts, [
set,
ordered_set,
protected,
{read_concurrency, true},
{write_concurrency, false},
@ -51,7 +51,7 @@
-type timestamp() :: integer().
-record(snap, {
vsn :: dmt_client:version(),
vsn :: dmt_client:vsn(),
tid :: ets:tid(),
last_access :: timestamp()
}).
@ -59,10 +59,12 @@
-type snap() :: #snap{}.
-record(object, {
ref :: dmt_client:object_ref(),
obj :: dmt_client:domain_object()
ref :: dmt_client:object_ref() | ets_match(),
obj :: dmt_client:domain_object() | ets_match()
}).
-type ets_match() :: '_' | '$1' | {atom(), ets_match()}.
-type woody_error() :: {woody_error, woody_error:system_error()}.
-type from() :: {pid(), term()}.
@ -85,38 +87,41 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec get(dmt_client:version()) -> {ok, dmt_client:snapshot()} | {error, version_not_found | woody_error()}.
get(Version) ->
get(Version, undefined).
-spec get(dmt_client:version(), dmt_client:transport_opts()) ->
-spec get(dmt_client:vsn(), dmt_client:transport_opts()) ->
{ok, dmt_client:snapshot()} | {error, version_not_found | woody_error()}.
get(Version, Opts) ->
case get_snapshot(Version) of
{ok, _Snapshot} = Result ->
Result;
{error, version_not_found} ->
call({get_snapshot, Version, Opts})
case ensure_version(Version, Opts) of
{ok, Version} ->
do_get(Version);
{error, _} = Error ->
Error
end.
-spec get_object(dmt_client:version(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found | woody_error()}.
get_object(Version, ObjectRef) ->
get_object(Version, ObjectRef, undefined).
-spec get_object(dmt_client:version(), dmt_client:object_ref(), dmt_client:transport_opts()) ->
-spec get_object(dmt_client:vsn(), dmt_client:object_ref(), dmt_client:transport_opts()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found | woody_error()}.
get_object(Version, ObjectRef, Opts) ->
case do_get_object(Version, ObjectRef) of
{ok, _Object} = Result ->
Result;
{error, version_not_found} ->
call({get_object, Version, ObjectRef, Opts});
{error, object_not_found} = NotFound ->
NotFound
case ensure_version(Version, Opts) of
{ok, Version} -> do_get_object(Version, ObjectRef);
{error, _} = Error -> Error
end.
-spec get_last_version() -> dmt_client:version() | no_return().
-spec get_objects_by_type(dmt_client:vsn(), dmt_client:object_type(), dmt_client:transport_opts()) ->
{ok, [dmt_client:domain_object()]} | {error, version_not_found | woody_error()}.
get_objects_by_type(Version, ObjectType, Opts) ->
case ensure_version(Version, Opts) of
{ok, Version} -> do_get_objects_by_type(Version, ObjectType);
{error, _} = Error -> Error
end.
-spec fold_objects(dmt_client:vsn(), dmt_client:object_folder(Acc), Acc, dmt_client:transport_opts()) ->
{ok, Acc} | {error, version_not_found | woody_error()}.
fold_objects(Version, Folder, Acc, Opts) ->
case ensure_version(Version, Opts) of
{ok, Version} -> do_fold_objects(Version, Folder, Acc);
{error, _} = Error -> Error
end.
-spec get_last_version() -> dmt_client:vsn() | no_return().
get_last_version() ->
case do_get_last_version() of
{ok, Version} ->
@ -130,7 +135,7 @@ get_last_version() ->
end
end.
-spec update() -> {ok, dmt_client:version()} | {error, woody_error()}.
-spec update() -> {ok, dmt_client:vsn()} | {error, woody_error()}.
update() ->
call(update).
@ -142,47 +147,31 @@ init(_) ->
{ok, #state{}, 0}.
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
handle_call({get_object, Version, ObjectRef, Opts}, From, #state{waiters = Waiters} = State) ->
case do_get_object(Version, ObjectRef) of
{ok, _Object} = Result ->
{reply, Result, State};
{error, object_not_found} = NotFound ->
{reply, NotFound, State};
{error, version_not_found} ->
DispatchFun = dispatch_object(ObjectRef),
NewWaiters = maybe_fetch({version, Version}, From, DispatchFun, Waiters, Opts),
{noreply, State#state{waiters = NewWaiters}}
handle_call({fetch_version, Version, Opts}, From, State) ->
case ets:member(?TABLE, Version) of
true ->
{reply, {ok, Version}, State};
false ->
{noreply, fetch_by_reference({version, Version}, From, Opts, State)}
end;
handle_call(update, From, State) ->
update_cache(From, State);
handle_call({get_snapshot, Version, Opts}, From, #state{waiters = Waiters} = State) ->
case get_snapshot(Version) of
{ok, _Snapshot} = Result ->
{reply, Result, State};
{error, version_not_found} ->
DispatchFun = fun dispatch_snapshot/2,
NewWaiters = maybe_fetch({version, Version}, From, DispatchFun, Waiters, Opts),
{noreply, State#state{waiters = NewWaiters}}
end;
{noreply, update(From, State)};
handle_call(_Msg, _From, State) ->
{noreply, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast({dispatch, Reference, Result}, #state{waiters = Waiters} = State) ->
case Result of
{ok, Snapshot} ->
put_snapshot(Snapshot);
_ ->
ok
end,
_ = [DispatchFun(From, Result) || {From, DispatchFun} <- maps:get(Reference, Waiters, [])],
{noreply, State#state{waiters = maps:remove(Reference, Waiters)}};
handle_cast(cleanup, State) ->
cleanup(),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(timeout, State) ->
update_cache(undefined, State);
{noreply, update(undefined, State)};
handle_info(_Msg, State) ->
{noreply, State}.
@ -219,13 +208,73 @@ call(Msg, Timeout) ->
cast(Msg) ->
gen_server:cast(?SERVER, Msg).
ensure_version(Version, Opts) ->
case ets:member(?TABLE, Version) of
true -> {ok, Version};
false -> call({fetch_version, Version, Opts})
end.
-spec do_get(dmt_client:vsn()) -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
do_get(Version) ->
case get_snap(Version) of
{ok, Snap} ->
build_snapshot(Snap);
{error, version_not_found} = Error ->
Error
end.
-spec do_get_object(dmt_client:vsn(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found}.
do_get_object(Version, ObjectRef) ->
{ok, #snap{tid = TID}} = get_snap(Version),
try ets:lookup(TID, ObjectRef) of
[#object{obj = Object}] ->
{ok, Object};
[] ->
{error, object_not_found}
catch
% table was deleted
% DISCUSS(ED-185): is it correct though? Wouldn't recuring back to original function be better?
% This way, we can fetch wiped snapshot again only for this version
error:badarg ->
{error, version_not_found}
end.
do_get_objects_by_type(Version, ObjectType) ->
{ok, #snap{tid = TID}} = get_snap(Version),
MatchSpec = [
{{object, '_', {ObjectType, '$1'}}, [], ['$1']}
],
try ets:select(TID, MatchSpec) of
Result ->
{ok, Result}
catch
%% DISCUSS: same as above for do_get_object
error:badarg ->
{error, version_not_found}
end.
do_fold_objects(Version, Folder, Acc) ->
{ok, #snap{tid = TID}} = get_snap(Version),
MappedFolder = fun({object, {Type, _Ref}, {Type, Object}}, AccIn) ->
Folder(Type, Object, AccIn)
end,
try ets:foldl(MappedFolder, Acc, TID) of
Result ->
{ok, Result}
catch
%% DISCUSS: same as above for do_get_object
error:badarg ->
{error, version_not_found}
end.
-spec put_snapshot(dmt_client:snapshot()) -> ok.
put_snapshot(#'Snapshot'{version = Version, domain = Domain}) ->
case get_snap(Version) of
{ok, _Snap} ->
ok;
{error, version_not_found} ->
TID = ets:new(?MODULE, ?snapshot_table_opts),
TID = ets:new(?MODULE, [{heir, whereis(?SERVER), ok}] ++ ?snapshot_table_opts),
true = put_domain_to_table(TID, Domain),
Snap = #snap{
vsn = Version,
@ -233,7 +282,8 @@ put_snapshot(#'Snapshot'{version = Version, domain = Domain}) ->
last_access = timestamp()
},
true = ets:insert(?TABLE, Snap),
cleanup()
cast(cleanup),
ok
end.
-spec put_domain_to_table(ets:tid(), dmt_client:domain()) -> true.
@ -246,16 +296,7 @@ put_domain_to_table(TID, Domain) ->
Domain
).
-spec get_snapshot(dmt_client:version()) -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
get_snapshot(Version) ->
case get_snap(Version) of
{ok, Snap} ->
do_get_snapshot(Snap);
{error, version_not_found} = Error ->
Error
end.
-spec get_snap(dmt_client:version()) -> {ok, snap()} | {error, version_not_found}.
-spec get_snap(dmt_client:vsn()) -> {ok, snap()} | {error, version_not_found}.
get_snap(Version) ->
case ets:lookup(?TABLE, Version) of
[Snap] ->
@ -269,49 +310,17 @@ get_snap(Version) ->
get_all_snaps() ->
ets:tab2list(?TABLE).
-spec do_get_object(dmt_client:version(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found}.
do_get_object(Version, ObjectRef) ->
case get_snap(Version) of
{ok, Snap} ->
get_object_by_snap(Snap, ObjectRef);
{error, version_not_found} = Error ->
Error
end.
update(From, State) ->
restart_timer(fetch_by_reference({head, #'Head'{}}, From, undefined, State)).
-spec get_object_by_snap(snap(), dmt_client:object_ref()) ->
{ok, dmt_client:domain_object()} | {error, version_not_found | object_not_found}.
get_object_by_snap(#snap{tid = TID}, ObjectRef) ->
try ets:lookup(TID, ObjectRef) of
[#object{obj = Object}] ->
{ok, Object};
[] ->
{error, object_not_found}
catch
% table was deleted
error:badarg ->
{error, version_not_found}
end.
-spec get_object_from_snapshot(dmt_client:object_ref(), dmt_client:snapshot()) ->
{ok, dmt_client:domain_object()} | {error, object_not_found}.
get_object_from_snapshot(ObjectRef, #'Snapshot'{domain = Domain}) ->
case dmt_domain:get_object(ObjectRef, Domain) of
{ok, _Object} = Result ->
Result;
error ->
{error, object_not_found}
end.
-spec update_cache(from() | undefined, state()) -> {noreply, state()}.
update_cache(From, #state{waiters = Waiters} = State) ->
DispatchFun = fun dispatch_update/2,
NewWaiters = maybe_fetch({head, #'Head'{}}, From, DispatchFun, Waiters, undefined),
{noreply, restart_timer(State#state{waiters = NewWaiters})}.
fetch_by_reference(Reference, From, Opts, #state{waiters = Waiters} = State) ->
DispatchFun = fun dispatch_reply/2,
NewWaiters = maybe_fetch(Reference, From, DispatchFun, Waiters, Opts),
State#state{waiters = NewWaiters}.
-spec maybe_fetch(dmt_client:ref(), from() | undefined, dispatch_fun(), waiters(), dmt_client:transport_opts()) ->
waiters().
maybe_fetch(Reference, From, DispatchFun, Waiters, Opts) ->
maybe_fetch(Reference, ReplyTo, DispatchFun, Waiters, Opts) ->
Prev =
case maps:find(Reference, Waiters) of
error ->
@ -320,13 +329,20 @@ maybe_fetch(Reference, From, DispatchFun, Waiters, Opts) ->
{ok, List} ->
List
end,
Waiters#{Reference => [{From, DispatchFun} | Prev]}.
Waiters#{Reference => [{ReplyTo, DispatchFun} | Prev]}.
-spec schedule_fetch(dmt_client:ref(), dmt_client:transport_opts()) -> pid().
schedule_fetch(Reference, Opts) ->
proc_lib:spawn_link(
fun() ->
Result = fetch(Reference, Opts),
Result =
case fetch(Reference, Opts) of
{ok, Snapshot} ->
put_snapshot(Snapshot),
{ok, Snapshot#'Snapshot'.version};
{error, _} = Error ->
Error
end,
cast({dispatch, Reference, Result})
end
).
@ -357,38 +373,16 @@ do_fetch({head, #'Head'{}}, Opts) ->
do_fetch(Reference, Opts) ->
dmt_client_backend:checkout(Reference, Opts).
-spec dispatch_snapshot(from() | undefined, fetch_result()) -> ok.
dispatch_snapshot(undefined, _Result) ->
-spec dispatch_reply(from() | undefined, fetch_result()) -> _.
dispatch_reply(undefined, _Result) ->
ok;
dispatch_snapshot(From, Result) ->
_ = gen_server:reply(From, Result),
ok.
dispatch_reply(From, {ok, Version}) ->
gen_server:reply(From, {ok, Version});
dispatch_reply(From, Error) ->
gen_server:reply(From, Error).
-spec dispatch_object(dmt_client:object_ref()) -> dispatch_fun().
dispatch_object(ObjectRef) ->
fun(From, Result) ->
Reply =
case Result of
{ok, Snapshot} ->
get_object_from_snapshot(ObjectRef, Snapshot);
Error ->
Error
end,
gen_server:reply(From, Reply)
end.
-spec dispatch_update(from() | undefined, fetch_result()) -> ok.
dispatch_update(undefined, _Result) ->
ok;
dispatch_update(From, {ok, #'Snapshot'{version = Version}}) ->
_ = gen_server:reply(From, {ok, Version}),
ok;
dispatch_update(From, Error) ->
_ = gen_server:reply(From, Error),
ok.
-spec do_get_snapshot(snap()) -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
do_get_snapshot(#snap{vsn = Version, tid = TID}) ->
-spec build_snapshot(snap()) -> {ok, dmt_client:snapshot()} | {error, version_not_found}.
build_snapshot(#snap{vsn = Version, tid = TID}) ->
try
Domain = ets:foldl(
fun(#object{obj = Object}, Domain) ->
@ -409,12 +403,12 @@ do_get_snapshot(#snap{vsn = Version, tid = TID}) ->
latest_snapshot() ->
case do_get_last_version() of
{ok, Version} ->
get_snapshot(Version);
do_get(Version);
{error, version_not_found} = Error ->
Error
end.
-spec do_get_last_version() -> {ok, dmt_client:version()} | {error, version_not_found}.
-spec do_get_last_version() -> {ok, dmt_client:vsn()} | {error, version_not_found}.
do_get_last_version() ->
case ets:last(?TABLE) of
'$end_of_table' ->
@ -442,7 +436,7 @@ cleanup() ->
{ok, HeadVersion} = do_get_last_version(),
cleanup(Sorted, HeadVersion).
-spec cleanup([snap()], dmt_client:version()) -> ok.
-spec cleanup([snap()], dmt_client:vsn()) -> ok.
cleanup([], _HeadVersion) ->
ok;
cleanup(Snaps, HeadVersion) ->
@ -481,7 +475,7 @@ ets_memory(TID) ->
Info = ets:info(TID),
proplists:get_value(memory, Info).
-spec remove_earliest([snap()], dmt_client:version()) -> [snap()].
-spec remove_earliest([snap()], dmt_client:vsn()) -> [snap()].
remove_earliest([#snap{vsn = HeadVersion} | Tail], HeadVersion) ->
Tail;
remove_earliest([Snap | Tail], _HeadVersion) ->
@ -494,7 +488,7 @@ remove_snap(#snap{tid = TID, vsn = Version}) ->
true = ets:delete(TID),
ok.
-spec update_last_access(dmt_client:version()) -> boolean().
-spec update_last_access(dmt_client:vsn()) -> boolean().
update_last_access(Version) ->
ets:update_element(?TABLE, Version, {#snap.last_access, timestamp()}).
@ -506,47 +500,172 @@ timestamp() ->
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
% dirty hack for warn_missing_spec
-spec test() -> any().
-include_lib("eunit/include/eunit.hrl").
-spec all_test_() -> ok.
-spec cleanup_test() -> ok.
cleanup_test() ->
application:set_env(dmt_client, max_cache_size, #{elements => 2, memory => 52428800}),
ok = create_tables(),
all_test_() ->
{setup,
fun() ->
create_tables(),
%% So that put_snapshot works correctly
register(?SERVER, self())
end,
[
fun test_cleanup/0,
fun test_last_access/0,
fun test_get_object/0,
fun test_get_object_by_type/0,
fun test_fold/0
]}.
set_cache_limits(Elements) ->
set_cache_limits(Elements, 52428800).
set_cache_limits(Elements, Memory) ->
application:set_env(dmt_client, max_cache_size, #{elements => Elements, memory => Memory}).
-spec test_cleanup() -> _.
test_cleanup() ->
set_cache_limits(2),
ok = put_snapshot(#'Snapshot'{version = 4, domain = dmt_domain:new()}),
ok = timer:sleep(1),
ok = put_snapshot(#'Snapshot'{version = 3, domain = dmt_domain:new()}),
ok = timer:sleep(1),
ok = put_snapshot(#'Snapshot'{version = 2, domain = dmt_domain:new()}),
ok = timer:sleep(1),
ok = put_snapshot(#'Snapshot'{version = 1, domain = dmt_domain:new()}),
cleanup(),
[
#snap{vsn = 1, _ = _},
#snap{vsn = 4, _ = _}
] = get_all_snaps(),
ok.
] = get_all_snaps().
-spec last_access_test() -> ok.
last_access_test() ->
application:set_env(dmt_client, max_cache_size, #{elements => 3, memory => 52428800}),
% Tables already created in cleanup_test/0
-spec test_last_access() -> _.
test_last_access() ->
set_cache_limits(3),
% Tables already created in test_cleanup/0
ok = put_snapshot(#'Snapshot'{version = 4, domain = dmt_domain:new()}),
ok = timer:sleep(1),
ok = put_snapshot(#'Snapshot'{version = 3, domain = dmt_domain:new()}),
ok = timer:sleep(1),
ok = put_snapshot(#'Snapshot'{version = 2, domain = dmt_domain:new()}),
ok = timer:sleep(1),
Ref = {category, #'domain_CategoryRef'{id = 1}},
{error, object_not_found} = get_object(3, Ref),
{error, object_not_found} = get_object(3, Ref, undefined),
ok = put_snapshot(#'Snapshot'{version = 1, domain = dmt_domain:new()}),
cleanup(),
[
#snap{vsn = 1, _ = _},
#snap{vsn = 3, _ = _},
#snap{vsn = 4, _ = _}
] = get_all_snaps(),
ok.
] = get_all_snaps().
-spec test_get_object() -> _.
test_get_object() ->
set_cache_limits(1),
Version = 5,
Cat = {_, Ref, _} = fixture(category),
Domain = dmt_insert_many(
[{category, Cat}]
),
ok = put_snapshot(#'Snapshot'{version = Version, domain = Domain}),
{ok, {category, Cat}} = get_object(Version, {category, Ref}, undefined).
-spec test_get_object_by_type() -> _.
test_get_object_by_type() ->
set_cache_limits(1),
Version = 6,
Cat1 = fixture(category),
Cat2 = fixture(category_2),
Domain = domain_with_all_fixtures(),
ok = put_snapshot(#'Snapshot'{version = Version, domain = Domain}),
{ok, Objects} = get_objects_by_type(Version, category, undefined),
[Cat1, Cat2] = lists:sort(Objects).
-spec test_fold() -> _.
test_fold() ->
set_cache_limits(1),
Version = 7,
Domain = domain_with_all_fixtures(),
ok = put_snapshot(#'Snapshot'{version = Version, domain = Domain}),
{ok, OrdSet} = fold_objects(
Version,
fun
(
category,
#'domain_CategoryObject'{
ref = #'domain_CategoryRef'{id = ID}
},
Acc
) ->
ordsets:add_element(ID, Acc);
(_Type, _Obj, Acc) ->
Acc
end,
ordsets:new(),
undefined
),
[1, 2] = ordsets:to_list(OrdSet).
domain_with_all_fixtures() ->
dmt_insert_many(
[
{category, fixture(category)},
{category, fixture(category_2)},
{currency, fixture(currency)}
]
).
dmt_insert_many(Objects) ->
dmt_insert_many(Objects, dmt_domain:new()).
dmt_insert_many(Objects, Domain) ->
lists:foldl(
fun(Object, DomainIn) ->
{ok, DomainOut} = dmt_domain:insert(Object, DomainIn),
DomainOut
end,
Domain,
Objects
).
fixture(ID) ->
maps:get(
ID,
#{
category => #'domain_CategoryObject'{
ref = #'domain_CategoryRef'{id = 1},
data = #'domain_Category'{
name = <<"cat">>,
description = <<"Sample category">>
}
},
category_2 => #'domain_CategoryObject'{
ref = #'domain_CategoryRef'{id = 2},
data = #'domain_Category'{
name = <<"dog">>,
description = <<"Sample category">>
}
},
currency => #'domain_CurrencyObject'{
ref = #'domain_CurrencyRef'{symbolic_code = <<"USD">>},
data = #'domain_Currency'{
name = <<"dog">>,
symbolic_code = <<"USD">>,
numeric_code = 840,
exponent = 2
}
}
}
).
% TEST
-endif.

View File

@ -83,8 +83,7 @@ end_per_suite(C) ->
%%% Dummy API
-spec commit(dmt_client:version(), dmt_client:commit(), dmt_client:transport_opts()) ->
dmt_client:version() | no_return().
-spec commit(dmt_client:vsn(), dmt_client:commit(), dmt_client:transport_opts()) -> dmt_client:vsn() | no_return().
commit(Version, _Commit, _Opts) ->
Version.
@ -105,7 +104,7 @@ checkout({head, #'Head'{}}, _Opts) ->
checkout_object(_Reference, _ObjectReference, _Opts) ->
erlang:throw(#'ObjectNotFound'{}).
-spec pull_range(dmt_client:version(), dmt_client:limit(), dmt_client:transport_opts()) ->
-spec pull_range(dmt_client:vsn(), dmt_client:limit(), dmt_client:transport_opts()) ->
dmt_client:history() | no_return().
pull_range(_Version, _Limit, _Opts) ->
timer:sleep(5000),
@ -115,18 +114,18 @@ pull_range(_Version, _Limit, _Opts) ->
-spec get_snapshot_success(config()) -> any().
get_snapshot_success(_C) ->
{ok, #'Snapshot'{}} = dmt_client_cache:get(?existing_version).
{ok, #'Snapshot'{}} = dmt_client_cache:get(?existing_version, undefined).
-spec snapshot_not_found(config()) -> any().
snapshot_not_found(_C) ->
{error, version_not_found} = dmt_client_cache:get(1).
{error, version_not_found} = dmt_client_cache:get(?notfound_version, undefined).
-spec woody_error(config()) -> any().
woody_error(_C) ->
{error, {woody_error, _}} = dmt_client_cache:get(?unavailable_version).
{error, {woody_error, _}} = dmt_client_cache:get(?unavailable_version, undefined).
-spec object_not_found(config()) -> any().
object_not_found(_C) ->
Ref = {category, #'domain_CategoryRef'{id = 1}},
{error, version_not_found} = dmt_client_cache:get_object(?notfound_version, Ref),
{error, object_not_found} = dmt_client_cache:get_object(?existing_version, Ref).
{error, version_not_found} = dmt_client_cache:get_object(?notfound_version, Ref, undefined),
{error, object_not_found} = dmt_client_cache:get_object(?existing_version, Ref, undefined).

View File

@ -56,13 +56,14 @@ end_per_suite(C) ->
poll(_C) ->
Object = fixture_domain_object(1, <<"InsertFixture">>),
Ref = fixture_object_ref(1),
#'ObjectNotFound'{} = (catch dmt_client:checkout_object({head, #'Head'{}}, Ref)),
#'Snapshot'{version = Version1} = dmt_client:checkout({head, #'Head'{}}),
#'ObjectNotFound'{} = (catch dmt_client:checkout_object(latest, Ref)),
#'Snapshot'{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client_api:commit(Version1, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}, undefined),
true = Version1 < Version2,
_ = dmt_client_cache:update(),
#'Snapshot'{version = Version2} = dmt_client:checkout({head, #'Head'{}}),
#'VersionedObject'{object = Object} = dmt_client:checkout_object({head, #'Head'{}}, Ref).
#'Snapshot'{version = Version2} = dmt_client:checkout(latest),
Object = dmt_client:checkout_object(latest, Ref),
#'VersionedObject'{version = Version2, object = Object} = dmt_client:checkout_versioned_object(latest, Ref).
fixture_domain_object(Ref, Data) ->
{category, #'domain_CategoryObject'{