DC-61: first implementation of snapshot storage (#57)

Idea: store some of snapshots in MG events to lower memory usage and state squashing time.
Naive legacy events migration included.
Lots of ad-hocs included too.
Also:
* remove ability to set cash size in elements
* remove unnecessary calls to old repository
* remove obvious lager messages
* fix strange dialyzer warning
* rework events
* remove redundant cache checks
* remove redundant cache put
This commit is contained in:
Evgeny Levenets 2017-11-16 19:41:03 +03:00 committed by GitHub
parent 1007632f37
commit 45bea4fbf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 489 additions and 336 deletions

View File

@ -7,10 +7,7 @@
% Bump keepalive timeout up to a minute
{timeout, 60000}
]},
{max_cache_size, #{
elements => 20,
memory => 52428800 % 50Mb
}}
{max_cache_size, 52428800} % 50Mb
]},
{lager, [
{error_logger_redirect, true},

View File

@ -4,11 +4,11 @@
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.2">>},2},
{<<"dmsl">>,
{git,"git@github.com:rbkmoney/damsel.git",
{ref,"c7708e0f3146535858268c508a302d500a69a6a6"}},
{ref,"fcee71c90e84512755b3cec9870e79a22a04a32a"}},
0},
{<<"dmt_client">>,
{git,"git@github.com:rbkmoney/dmt_client.git",
{ref,"93577c536d6d76018e383bd0fd5e02dec6f84508"}},
{ref,"52183b5006e5459939727f99f085d3c680a245ec"}},
0},
{<<"dmt_core">>,
{git,"git@github.com:rbkmoney/dmt_core.git",
@ -30,11 +30,15 @@
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mg_proto">>,
{git,"git@github.com:rbkmoney/machinegun_proto.git",
{ref,"35a23af91ee4245b6faffda4ed66a926df087bdf"}},
{ref,"5c07c579014f9900357f7a72f9d10a03008b9da1"}},
0},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.2">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.4.0">>},2},
{<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.9.0">>},1},
{<<"scoper">>,
{git,"git@github.com:rbkmoney/scoper.git",
{ref,"802057089bac258f45e35263eb2223961618468d"}},
1},
{<<"snowflake">>,
{git,"https://github.com/rbkmoney/snowflake.git",
{ref,"0a598108f6582affe3b4ae550fc5b9f2062e318a"}},
@ -46,7 +50,7 @@
1},
{<<"woody">>,
{git,"git@github.com:rbkmoney/woody_erlang.git",
{ref,"2d00bda10454534e230d452b7338debafaf0a869"}},
{ref,"ad1e91050c36d8de15f1c7d8dd8a2c682d2d158c"}},
0}]}.
[
{pkg_hash,[

View File

@ -63,4 +63,4 @@ get_handler_spec(state_processor) ->
}}.
get_repository_mod() ->
genlib_app:env(?MODULE, repository, dmt_api_repository_v2).
genlib_app:env(?MODULE, repository, dmt_api_repository_v3).

View File

@ -23,7 +23,7 @@
response() |
no_return().
call(NS, ID, Args, Context) ->
call(NS, ID, #'HistoryRange'{}, Args, Context).
call(NS, ID, #'mg_stateproc_HistoryRange'{}, Args, Context).
-spec call(ns(), id(), history_range(), args(), context()) ->
response() |
@ -33,7 +33,7 @@ call(NS, ID, HistoryRange, Args, Context) ->
case issue_rpc('Call', [Descriptor, Args], Context) of
{ok, Result} ->
Result;
{error, #'MachineNotFound'{}} ->
{error, #'mg_stateproc_MachineNotFound'{}} ->
ok = start(NS, ID, Context),
call(NS, ID, Args, Context)
end.
@ -48,7 +48,7 @@ call(NS, ID, HistoryRange, Args, Context) ->
get_history(NS, ID, HistoryRange, Context) ->
Descriptor = construct_descriptor(NS, ID, HistoryRange),
case issue_rpc('GetMachine', [Descriptor], Context) of
{ok, #'Machine'{history = History}} ->
{ok, #'mg_stateproc_Machine'{history = History}} ->
{ok, History};
{error, _} = Error ->
Error
@ -57,17 +57,17 @@ get_history(NS, ID, HistoryRange, Context) ->
-spec start(ns(), id(), context()) ->
ok | no_return().
start(NS, ID, Context) ->
case issue_rpc('Start', [NS, ID, {nl, #msgpack_Nil{}}], Context) of
case issue_rpc('Start', [NS, ID, {nl, #mg_msgpack_Nil{}}], Context) of
{ok, _} ->
ok;
{error, #'MachineAlreadyExists'{}} ->
{error, #'mg_stateproc_MachineAlreadyExists'{}} ->
ok
end.
-spec construct_descriptor(ns(), id(), history_range()) ->
descriptor().
construct_descriptor(NS, ID, HistoryRange) ->
#'MachineDescriptor'{
#'mg_stateproc_MachineDescriptor'{
ns = NS,
ref = {id, ID},
range = HistoryRange
@ -82,9 +82,9 @@ issue_rpc(Method, Args, Context) ->
case woody_client:call(Request, Opts, Context) of
{ok, _} = Ok ->
Ok;
{exception, #'NamespaceNotFound'{}} ->
{exception, #'mg_stateproc_NamespaceNotFound'{}} ->
error(namespace_not_found);
{exception, #'MachineFailed'{}} ->
{exception, #'mg_stateproc_MachineFailed'{}} ->
error(machine_failed);
{exception, Exception} ->
{error, Exception}

View File

@ -157,11 +157,9 @@ get_closest_snapshot(Prev, Next, Version) ->
end.
cleanup() ->
{Elements, Memory} = get_cache_size(),
CacheLimits = genlib_app:env(dmt_api, max_cache_size),
MaxElements = genlib_map:get(elements, CacheLimits, 20),
MaxMemory = genlib_map:get(memory, CacheLimits, 52428800), % 50Mb by default
case Elements > MaxElements orelse Memory > MaxMemory of
CacheSize = get_cache_size(),
CacheLimit = genlib_app:env(dmt_api, max_cache_size, 52428800), % 50Mb by default
case CacheSize > CacheLimit of
true ->
ok = remove_earliest(),
cleanup();
@ -170,9 +168,7 @@ cleanup() ->
end.
get_cache_size() ->
WordSize = erlang:system_info(wordsize),
Info = ets:info(?TABLE),
{proplists:get_value(size, Info), WordSize * proplists:get_value(memory, Info)}.
erlang:system_info(wordsize) * ets:info(?TABLE, memory).
remove_earliest() ->
% Naive implementation, but probably good enough

View File

@ -8,7 +8,6 @@
-export([checkout_object/4]).
-export([pull/3]).
-export([commit/4]).
-export([apply_commit/5]).
-export_type([version/0]).
-export_type([snapshot/0]).
@ -32,10 +31,13 @@
{object_reference_mismatch, object_ref()} |
{objects_not_exist, [{object_ref(), [object_ref()]}]}.
-callback get_history(pos_integer() | undefined, context()) ->
history().
-callback get_history(version(), pos_integer() | undefined, context()) ->
{ok, history()} | {error, version_not_found}.
-callback checkout(ref(), context()) ->
% TODO this was made due to dialyzer warning, can't find the way to fix it
{ok, term()} |
{error, version_not_found}.
-callback pull(version(), context()) ->
{ok, history()} |
{error, version_not_found}.
-callback commit(version(), commit(), context()) ->
{ok, snapshot()} |
{error, version_not_found | {operation_conflict, operation_conflict()}}.
@ -45,19 +47,20 @@
-spec checkout(ref(), repository(), context()) ->
{ok, snapshot()} | {error, version_not_found}.
checkout({head, #'Head'{}}, Repository, Context) ->
ClosestSnapshot = ensure_snapshot(dmt_api_cache:get_latest()),
{ok, History} = get_history(Repository, ClosestSnapshot#'Snapshot'.version, undefined, Context),
%% TO DO: Need to fix dmt_history:head. It can return {error, ...}
{ok, Snapshot} = dmt_history:head(History, ClosestSnapshot),
checkout({head, #'Head'{}} = V, Repository, Context) ->
case Repository:checkout(V, Context) of
{ok, Snapshot} ->
{ok, dmt_api_cache:put(Snapshot)};
{error, version_not_found} ->
{error, version_not_found}
end;
checkout({version, Version}, Repository, Context) ->
checkout({version, Version} = V, Repository, Context) ->
case dmt_api_cache:get(Version) of
{ok, Snapshot} ->
{ok, Snapshot};
{error, version_not_found} ->
case try_get_snapshot(Version, Repository, Context) of
case Repository:checkout(V, Context) of
{ok, Snapshot} ->
{ok, dmt_api_cache:put(Snapshot)};
{error, version_not_found} ->
@ -80,7 +83,7 @@ checkout_object(Reference, ObjectReference, Repository, Context) ->
{ok, history()} | {error, version_not_found}.
pull(Version, Repository, Context) ->
get_history(Repository, Version, undefined, Context).
Repository:pull(Version, Context).
-spec commit(version(), commit(), repository(), context()) ->
{ok, version()} |
@ -90,8 +93,8 @@ commit(Version, Commit, Repository, Context) ->
case ensure_snapshot(dmt_api_cache:get_latest()) of
#'Snapshot'{version = CachedVersion} when Version >= CachedVersion ->
case Repository:commit(Version, Commit, Context) of
{ok, Snapshot = #'Snapshot'{version = VersionNext}} ->
_ = dmt_api_cache:put(Snapshot),
{ok, Snapshot} ->
#'Snapshot'{version = VersionNext} = dmt_api_cache:put(Snapshot),
{ok, VersionNext};
{error, _} = Error ->
Error
@ -100,41 +103,8 @@ commit(Version, Commit, Repository, Context) ->
{error, head_mismatch}
end.
-spec apply_commit(version(), commit(), history(), repository(), context()) ->
{ok, snapshot()} | {error, term()}.
% FIXME map_size(History) should be in dmt_history.
apply_commit(VersionWas, #'Commit'{ops = Ops}, History, Repository, Context) when map_size(History) =:= 0 ->
case checkout({version, VersionWas}, Repository, Context) of
{ok, BaseSnapshot} ->
try_apply_commit(BaseSnapshot, Ops);
{error, version_not_found} ->
{error, version_not_found}
end;
apply_commit(_, _, _, _, _) ->
{error, version_not_found}. % FIXME move to dmt_history
%% Internal
-spec try_get_snapshot(version(), repository(), context()) ->
{ok, snapshot()} | {error, version_not_found}.
try_get_snapshot(Version, Repository, Context) ->
ClosestSnapshot = ensure_snapshot(dmt_api_cache:get_closest(Version)),
From = min(Version, ClosestSnapshot#'Snapshot'.version),
Limit = abs(Version - ClosestSnapshot#'Snapshot'.version),
case get_history(Repository, From, Limit, Context) of
{ok, History} when map_size(History) =:= Limit ->
%% TO DO: Need to fix dmt_history:travel. It can return {error, ...}
{ok, Snapshot} = dmt_history:travel(Version, History, ClosestSnapshot),
{ok, Snapshot};
{ok, #{}} ->
{error, version_not_found};
{error, version_not_found} ->
{error, version_not_found}
end.
-spec try_get_object(object_ref(), snapshot()) ->
{ok, dmsl_domain_config_thrift:'VersionedObject'()} | {error, object_not_found}.
@ -146,14 +116,6 @@ try_get_object(ObjectReference, #'Snapshot'{version = Version, domain = Domain})
{error, object_not_found}
end.
try_apply_commit(#'Snapshot'{version = VersionWas, domain = DomainWas}, Ops) ->
case dmt_domain:apply_operations(Ops, DomainWas) of
{ok, Domain} ->
{ok, #'Snapshot'{version = VersionWas + 1, domain = Domain}};
{error, _} = Error ->
Error
end.
-spec ensure_snapshot({ok, snapshot()} | {error, version_not_found}) ->
snapshot().
@ -161,9 +123,3 @@ ensure_snapshot({ok, Snapshot}) ->
Snapshot;
ensure_snapshot({error, version_not_found}) ->
#'Snapshot'{version = 0, domain = dmt_domain:new()}.
-spec get_history(repository(), version(), pos_integer() | undefined, context()) ->
{ok, history()} | {error, version_not_found}.
get_history(Mod, Version, Limit, Context) ->
Mod:get_history(Version, Limit, Context).

View File

@ -1,155 +0,0 @@
-module(dmt_api_repository_v1).
-behaviour(dmt_api_repository).
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary">>).
%% API
-export([get_history/2]).
-export([get_history/3]).
-export([commit/3]).
%% State processor
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-type context() :: woody_context:ctx().
-spec get_history(pos_integer() | undefined, context()) ->
dmt_api_repository:history().
get_history(Limit, Context) ->
get_history_by_range(#'HistoryRange'{'after' = undefined, 'limit' = Limit}, Context).
-spec get_history(dmt_api_repository:version(), pos_integer() | undefined, context()) ->
{ok, dmt_api_repository:history()} | {error, version_not_found}.
get_history(Version, Limit, Context) ->
After = get_event_id(Version),
case get_history_by_range(#'HistoryRange'{'after' = After, 'limit' = Limit}, Context) of
History when is_map(History) ->
{ok, History};
Error ->
Error
end.
get_event_id(ID) when is_integer(ID) andalso ID > 0 ->
ID;
get_event_id(0) ->
undefined.
%%
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-spec get_history_by_range(history_range(), context()) ->
dmt_api_repository:history() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #'MachineNotFound'{}} ->
#{};
{error, #'EventNotFound'{}} ->
{error, version_not_found}
end.
%%
-spec commit(dmt_api_repository:version(), dmt_api_repository:commit(), context()) ->
{ok, dmt_api_repository:snapshot()} |
{error, version_not_found | {operation_conflict, dmt_api_repository:operation_conflict()}}.
commit(Version, Commit, Context) ->
decode_call_result(dmt_api_automaton_client:call(
?NS,
?ID,
#'HistoryRange'{'after' = get_event_id(Version)},
encode_call({commit, Version, Commit}),
Context
)).
%%
-define(NIL, {nl, #msgpack_Nil{}}).
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessCall', [#'CallArgs'{arg = Payload, machine = Machine}], Context, _Opts) ->
Call = decode_call(Payload),
{Result, Events} = handle_call(Call, read_history(Machine), Context),
Response = encode_call_result(Result),
{ok, construct_call_result(Response, [encode_event(E) || E <- Events])};
handle_function('ProcessSignal', [#'SignalArgs'{signal = {init, #'InitSignal'{}}}], _Context, _Opts) ->
{ok, construct_signal_result([])}.
construct_call_result(Response, Events) ->
#'CallResult'{
response = Response,
change = #'MachineStateChange'{aux_state = ?NIL, events = Events},
action = #'ComplexAction'{}
}.
construct_signal_result(Events) ->
#'SignalResult'{
change = #'MachineStateChange'{aux_state = ?NIL, events = Events},
action = #'ComplexAction'{}
}.
%%
handle_call({commit, Version, Commit}, History, Context) ->
case dmt_api_repository:apply_commit(Version, Commit, History, ?MODULE, Context) of
{ok, _} = Ok ->
{Ok, [{commit, Commit}]};
{error, version_not_found} ->
{{error, version_not_found}, []};
{error, Reason} ->
_ = lager:info("commit failed: ~p", [Reason]),
{{error, {operation_conflict, Reason}}, []}
end.
%%
-spec read_history(history() | machine()) ->
dmt_api_repository:history().
read_history(#'Machine'{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #{}).
-spec read_history(history(), dmt_api_repository:history()) ->
dmt_api_repository:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = EventData} | Rest], History) ->
{commit, Commit} = decode_event(EventData),
read_history(Rest, History#{Id => Commit}).
%%
encode_event({commit, Commit}) ->
{bin, term_to_binary(Commit)}.
decode_event({bin, CommitData}) ->
{commit, binary_to_term(CommitData)}.
%%
encode_call(Call) ->
{bin, term_to_binary(Call)}.
decode_call({bin, CallData}) ->
binary_to_term(CallData).
encode_call_result(Result) ->
{bin, term_to_binary(Result)}.
decode_call_result({bin, ResultData}) ->
binary_to_term(ResultData).

View File

@ -1,19 +1,26 @@
-module(dmt_api_repository_v2).
-behaviour(dmt_api_repository).
%%
%% This is old, depricated version of domain repository.
%%
-include_lib("dmsl/include/dmsl_domain_config_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary/v2">>).
%% API
-behaviour(dmt_api_repository).
-export([checkout/2]).
-export([pull/2]).
-export([commit/3]).
-export([get_history/2]).
-export([get_history/3]).
-export([commit/3]).
%% State processor
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
@ -21,17 +28,136 @@
%%
-type context() :: woody_context:ctx().
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type snapshot() :: dmt_api_repository:snapshot().
-type commit() :: dmt_api_repository:commit().
%%
-spec checkout(ref(), context()) ->
{ok, snapshot()} |
{error, version_not_found}.
checkout({head, #'Head'{}}, Context) ->
Snapshot = ensure_snapshot(dmt_api_cache:get_latest()),
case get_history(Snapshot#'Snapshot'.version, undefined, Context) of
{ok, History} ->
dmt_history:head(History, Snapshot);
{error, version_not_found} ->
{error, version_not_found}
end;
checkout({version, Version}, Context) ->
ClosestSnapshot = ensure_snapshot(dmt_api_cache:get_closest(Version)),
From = min(Version, ClosestSnapshot#'Snapshot'.version),
Limit = abs(Version - ClosestSnapshot#'Snapshot'.version),
case get_history(From, Limit, Context) of
{ok, History} when map_size(History) =:= Limit ->
%% TO DO: Need to fix dmt_history:travel. It can return {error, ...}
{ok, Snapshot} = dmt_history:travel(Version, History, ClosestSnapshot),
{ok, Snapshot};
{ok, #{}} ->
{error, version_not_found};
{error, version_not_found} ->
{error, version_not_found}
end.
-spec pull(dmt_api_repository:version(), context()) ->
{ok, dmt_api_repository:history()} |
{error, version_not_found}.
pull(Version, Context) ->
After = get_event_id(Version),
case get_history_by_range(#mg_stateproc_HistoryRange{'after' = After}, Context) of
History when is_map(History) ->
{ok, History};
Error ->
Error
end.
-spec commit(dmt_api_repository:version(), commit(), context()) ->
{ok, snapshot()} |
{error, version_not_found | {operation_conflict, dmt_api_repository:operation_conflict()}}.
commit(Version, Commit, Context) ->
decode_call_result(dmt_api_automaton_client:call(
?NS,
?ID,
#mg_stateproc_HistoryRange{'after' = undefined},
encode_call({commit, Version, Commit}),
Context
)).
%%
-define(NIL, {nl, #mg_msgpack_Nil{}}).
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessCall', [#mg_stateproc_CallArgs{arg = Payload, machine = Machine}], Context, _Opts) ->
Call = decode_call(Payload),
{Result, Events} = handle_call(Call, read_history(Machine), Context),
{ok, construct_call_result(Result, Events)};
handle_function(
'ProcessSignal',
[#mg_stateproc_SignalArgs{
signal = {init, #mg_stateproc_InitSignal{}}
}],
_Context,
_Opts
) ->
% No migration here, just start empty machine
{ok, #mg_stateproc_SignalResult{
change = #mg_stateproc_MachineStateChange{aux_state = ?NIL, events = []},
action = #mg_stateproc_ComplexAction{}
}}.
construct_call_result(Response, Events) ->
#mg_stateproc_CallResult{
response = encode_call_result(Response),
change = #mg_stateproc_MachineStateChange{aux_state = ?NIL, events = encode_events(Events)},
action = #mg_stateproc_ComplexAction{}
}.
encode_events(Events) ->
[encode_event(E) || E <- Events].
%%
handle_call({commit, Version, Commit}, History, _Context) ->
Snapshot0 = ensure_snapshot(dmt_api_cache:get_latest()),
case dmt_history:head(History, Snapshot0) of
{ok, #'Snapshot'{version = Version} = Snapshot} ->
apply_commit(Snapshot, Commit);
{ok, _} ->
{{error, head_mismatch}, []};
{error, _} = Error ->
{Error, []}
end.
apply_commit(#'Snapshot'{version = VersionWas, domain = DomainWas}, #'Commit'{ops = Ops} = Commit) ->
case dmt_domain:apply_operations(Ops, DomainWas) of
{ok, Domain} ->
Snapshot = #'Snapshot'{version = VersionWas + 1, domain = Domain},
{{ok, Snapshot}, [{commit, Commit}]};
{error, Reason} ->
{{error, {operation_conflict, Reason}}, []}
end.
%%
-spec get_history(pos_integer() | undefined, context()) ->
dmt_api_repository:history().
get_history(Limit, Context) ->
get_history_by_range(#'HistoryRange'{'after' = undefined, 'limit' = Limit}, Context).
get_history_by_range(#'mg_stateproc_HistoryRange'{'after' = undefined, 'limit' = Limit}, Context).
-spec get_history(dmt_api_repository:version(), pos_integer() | undefined, context()) ->
{ok, dmt_api_repository:history()} | {error, version_not_found}.
get_history(Version, Limit, Context) ->
After = get_event_id(Version),
case get_history_by_range(#'HistoryRange'{'after' = After, 'limit' = Limit}, Context) of
case get_history_by_range(#'mg_stateproc_HistoryRange'{'after' = After, 'limit' = Limit}, Context) of
History when is_map(History) ->
{ok, History};
Error ->
@ -43,94 +169,27 @@ get_event_id(ID) when is_integer(ID) andalso ID > 0 ->
get_event_id(0) ->
undefined.
%%
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-spec get_history_by_range(history_range(), context()) ->
dmt_api_repository:history() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #'MachineNotFound'{}} ->
{error, #'mg_stateproc_MachineNotFound'{}} ->
ok = dmt_api_automaton_client:start(?NS, ?ID, Context),
get_history_by_range(HistoryRange, Context);
{error, #'EventNotFound'{}} ->
{error, #'mg_stateproc_EventNotFound'{}} ->
{error, version_not_found}
end.
%%
-spec commit(dmt_api_repository:version(), dmt_api_repository:commit(), context()) ->
{ok, dmt_api_repository:snapshot()} |
{error, version_not_found | {operation_conflict, dmt_api_repository:operation_conflict()}}.
commit(Version, Commit, Context) ->
decode_call_result(dmt_api_automaton_client:call(
?NS,
?ID,
#'HistoryRange'{'after' = get_event_id(Version)},
encode_call({commit, Version, Commit}),
Context
)).
%%
-define(NIL, {nl, #msgpack_Nil{}}).
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessCall', [#'CallArgs'{arg = Payload, machine = Machine}], Context, _Opts) ->
Call = decode_call(Payload),
{Result, Events} = handle_call(Call, read_history(Machine), Context),
{ok, construct_call_result(Result, Events)};
handle_function('ProcessSignal', [#'SignalArgs'{signal = {init, #'InitSignal'{}}}], Context, _Opts) ->
%%% TODO It's generally prettier to make up a _migrating_ repository which is the special repository
%%% module designed to facilitate migrations between some preconfigured 'old' repository backend
%%% and some 'new' one. The migration process could be triggered by the very first mutating
%%% operation (e.g. commit) going into this backend for example.
LegacyHistory = dmt_api_repository_v1:get_history(undefined, Context),
{ok, construct_signal_result(get_events_from_history(LegacyHistory))}.
get_events_from_history(History) ->
[{commit, Commit} || {_Version, Commit} <- lists:keysort(1, maps:to_list(History))].
construct_call_result(Response, Events) ->
#'CallResult'{
response = encode_call_result(Response),
change = #'MachineStateChange'{aux_state = ?NIL, events = encode_events(Events)},
action = #'ComplexAction'{}
}.
construct_signal_result(Events) ->
#'SignalResult'{
change = #'MachineStateChange'{aux_state = ?NIL, events = encode_events(Events)},
action = #'ComplexAction'{}
}.
encode_events(Events) ->
[encode_event(E) || E <- Events].
%%
handle_call({commit, Version, Commit}, History, Context) ->
case dmt_api_repository:apply_commit(Version, Commit, History, ?MODULE, Context) of
{ok, _} = Ok ->
{Ok, [{commit, Commit}]};
{error, version_not_found} ->
{{error, version_not_found}, []};
{error, Reason} ->
_ = lager:info("commit failed: ~p", [Reason]),
{{error, {operation_conflict, Reason}}, []}
end.
%%
ensure_snapshot({ok, Snapshot}) ->
Snapshot;
ensure_snapshot({error, version_not_found}) ->
#'Snapshot'{version = 0, domain = dmt_domain:new()}.
-spec read_history(machine() | history()) ->
dmt_api_repository:history().
read_history(#'Machine'{history = Events}) ->
read_history(#'mg_stateproc_Machine'{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #{}).
@ -139,7 +198,7 @@ read_history(Events) ->
dmt_api_repository:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = EventData} | Rest], History) ->
read_history([#'mg_stateproc_Event'{id = Id, event_payload = EventData} | Rest], History) ->
{commit, Commit} = decode_event(EventData),
read_history(Rest, History#{Id => Commit}).

View File

@ -0,0 +1,299 @@
-module(dmt_api_repository_v3).
-behaviour(dmt_api_repository).
-include_lib("dmsl/include/dmsl_domain_config_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary/v3">>).
-define(BASE, 10).
%% API
-export([checkout/2]).
-export([pull/2]).
-export([commit/3]).
%% State processor
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-record(st, {
snapshot = #'Snapshot'{version = 0, domain = dmt_domain:new()} :: snapshot(),
history = #{} :: dmt_api_repository:history()
}).
-type st() :: #st{}.
-type context() :: woody_context:ctx().
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type snapshot() :: dmt_api_repository:snapshot().
-type commit() :: dmt_api_repository:commit().
-spec checkout(ref(), context()) ->
{ok, snapshot()} |
{error, version_not_found}.
checkout({head, #'Head'{}}, Context) ->
HistoryRange = #mg_stateproc_HistoryRange{
'after' = undefined,
'limit' = ?BASE,
'direction' = backward
},
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
squash_state(St);
{error, version_not_found} ->
{error, version_not_found}
end;
checkout({version, V}, Context) ->
BaseV = get_base_version(V),
HistoryRange = #mg_stateproc_HistoryRange{
'after' = get_event_id(BaseV),
'limit' = V - BaseV,
'direction' = forward
},
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
case squash_state(St) of
{ok, #'Snapshot'{version = V}} = Result ->
Result;
{ok, _} ->
{error, version_not_found}
end;
{error, version_not_found} ->
{error, version_not_found}
end.
-spec pull(dmt_api_repository:version(), context()) ->
{ok, dmt_api_repository:history()} |
{error, version_not_found}.
pull(Version, Context) ->
After = get_event_id(Version),
case get_history_by_range(#mg_stateproc_HistoryRange{'after' = After}, Context) of
#st{history = History} ->
{ok, History};
{error, version_not_found} ->
{error, version_not_found}
end.
-spec commit(dmt_api_repository:version(), commit(), context()) ->
{ok, snapshot()} |
{error, version_not_found | {operation_conflict, dmt_api_repository:operation_conflict()}}.
commit(Version, Commit, Context) ->
BaseID = get_event_id(get_base_version(Version)),
decode_call_result(dmt_api_automaton_client:call(
?NS,
?ID,
#mg_stateproc_HistoryRange{'after' = BaseID},
encode_call({commit, Version, Commit}),
Context
)).
%%
-spec get_history_by_range(history_range(), context()) ->
st() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #mg_stateproc_MachineNotFound{}} ->
ok = dmt_api_automaton_client:start(?NS, ?ID, Context),
get_history_by_range(HistoryRange, Context);
{error, #mg_stateproc_EventNotFound{}} ->
{error, version_not_found}
end.
%%
-define(NIL, {nl, #mg_msgpack_Nil{}}).
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessCall', [#mg_stateproc_CallArgs{arg = Payload, machine = Machine}], Context, _Opts) ->
Call = decode_call(Payload),
{Result, Events} = handle_call(Call, read_history(Machine), Context),
{ok, construct_call_result(Result, Events)};
handle_function(
'ProcessSignal',
[#mg_stateproc_SignalArgs{
signal = {init, #mg_stateproc_InitSignal{}}
}],
Context,
_Opts
) ->
%%% TODO It's generally prettier to make up a _migrating_ repository which is the special repository
%%% module designed to facilitate migrations between some preconfigured 'old' repository backend
%%% and some 'new' one. The migration process could be triggered by the very first mutating
%%% operation (e.g. commit) going into this backend for example.
Events = get_events_from_legacy(Context),
{ok, construct_signal_result(Events)}.
construct_call_result(Response, Events) ->
#mg_stateproc_CallResult{
response = encode_call_result(Response),
change = #mg_stateproc_MachineStateChange{aux_state = ?NIL, events = encode_events(Events)},
action = #mg_stateproc_ComplexAction{}
}.
construct_signal_result(Events) ->
#mg_stateproc_SignalResult{
change = #mg_stateproc_MachineStateChange{aux_state = ?NIL, events = encode_events(Events)},
action = #mg_stateproc_ComplexAction{}
}.
encode_events(Events) ->
[encode_event(E) || E <- Events].
%%
handle_call({commit, Version, Commit}, St, _Context) ->
case squash_state(St) of
{ok, #'Snapshot'{version = Version} = Snapshot} ->
apply_commit(Snapshot, Commit);
{ok, _} ->
{{error, head_mismatch}, []}
end.
apply_commit(#'Snapshot'{version = VersionWas, domain = DomainWas}, #'Commit'{ops = Ops} = Commit) ->
case dmt_domain:apply_operations(Ops, DomainWas) of
{ok, Domain} ->
Snapshot = #'Snapshot'{version = VersionWas + 1, domain = Domain},
{{ok, Snapshot}, [make_event(Snapshot, Commit)]};
{error, Reason} ->
{{error, {operation_conflict, Reason}}, []}
end.
%%
-spec read_history(machine() | history()) ->
st().
read_history(#mg_stateproc_Machine{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #st{}).
-spec read_history([mg_proto_state_processing_thrift:'Event'()], st()) ->
st().
read_history([], St) ->
St;
read_history([#mg_stateproc_Event{id = Id, event_payload = EventData} | Rest], #st{history = History} = St) ->
{commit, Commit, Meta} = decode_event(EventData),
case Meta of
#{snapshot := Snapshot} ->
read_history(
Rest,
St#st{snapshot = Snapshot, history = History#{Id => Commit}}
);
#{} ->
read_history(
Rest,
St#st{history = History#{Id => Commit}}
)
end.
squash_state(#st{snapshot = BaseSnapshot, history = History}) ->
case dmt_history:head(History, BaseSnapshot) of
{ok, Snapshot} ->
{ok, Snapshot};
{error, Error} ->
error(Error)
end.
%%
get_events_from_legacy(Context) ->
LegacyHistory = dmt_api_repository_v2:get_history(undefined, Context),
History = lists:keysort(1, maps:to_list(LegacyHistory)),
convert_v2_events(History, #'Snapshot'{version = 0, domain = dmt_domain:new()}, []).
convert_v2_events([{Version1, Commit} | Others], #'Snapshot'{version = Version0} = Snapshot0, Events)
when Version0 + 1 == Version1 % paranoidal check :)
->
case apply_commit(Snapshot0, Commit) of
{{ok, Snapshot}, [Event]} ->
convert_v2_events(Others, Snapshot, [Event | Events]);
{{error, Error}, []} ->
error(Error)
end;
convert_v2_events([], _, Events) ->
lists:reverse(Events).
make_event(Snapshot, Commit) ->
Meta = case (Snapshot#'Snapshot'.version) rem ?BASE of
0 ->
#{snapshot => Snapshot};
_ ->
#{}
end,
{commit, Commit, Meta}.
encode_event({commit, Commit, Meta}) ->
{arr, [{str, <<"commit">>}, encode(commit, Commit), encode_commit_meta(Meta)]}.
encode_commit_meta(#{snapshot := Snapshot}) ->
{obj, #{{str, <<"snapshot">>} => encode(snapshot, Snapshot)}};
encode_commit_meta(#{}) ->
{obj, #{}}.
decode_event({arr, [{str, <<"commit">>}, Commit, Meta]}) ->
{commit, decode(commit, Commit), decode_commit_meta(Meta)}.
decode_commit_meta({obj, #{{str, <<"snapshot">>} := Snapshot}}) ->
#{snapshot => decode(snapshot, Snapshot)};
decode_commit_meta({obj, #{}}) ->
#{}.
%%
encode_call({commit, Version, Commit}) ->
{arr, [{str, <<"commit">>}, {i, Version}, encode(commit, Commit)]}.
decode_call({arr, [{str, <<"commit">>}, {i, Version}, Commit]}) ->
{commit, Version, decode(commit, Commit)}.
encode_call_result({ok, Snapshot}) ->
{arr, [{str, <<"ok">> }, encode(snapshot, Snapshot)]};
encode_call_result({error, Reason}) ->
{arr, [{str, <<"err">>}, {bin, term_to_binary(Reason)}]}.
decode_call_result({arr, [{str, <<"ok">> }, Snapshot]}) ->
{ok, decode(snapshot, Snapshot)};
decode_call_result({arr, [{str, <<"err">>}, {bin, Reason}]}) ->
{error, binary_to_term(Reason)}.
%%
decode(T, V) ->
dmt_api_thrift_utils:decode(msgpack, get_type_info(T), V).
encode(T, V) ->
dmt_api_thrift_utils:encode(msgpack, get_type_info(T), V).
get_type_info(commit) ->
{struct, struct, {dmsl_domain_config_thrift, 'Commit'}};
get_type_info(snapshot) ->
{struct, struct, {dmsl_domain_config_thrift, 'Snapshot'}}.
get_base_version(V) when is_integer(V) andalso V >= ?BASE ->
(V div ?BASE) * ?BASE - 1;
get_base_version(V) when is_integer(V) ->
0.
get_event_id(ID) when is_integer(ID) andalso ID > 0 ->
ID;
get_event_id(0) ->
undefined.

View File

@ -27,18 +27,18 @@
-spec all() -> [{group, group_name()}].
all() ->
[
{group, basic_lifecycle_v1},
{group, basic_lifecycle_v2}
{group, basic_lifecycle_v2},
{group, basic_lifecycle_v3}
].
-spec groups() -> [{group_name(), list(), [test_case_name()]}].
groups() ->
[
{basic_lifecycle_v1, [sequence], [
{basic_lifecycle_v2, [sequence], [
pull_commit,
{group, basic_lifecycle}
]},
{basic_lifecycle_v2, [sequence], [
{basic_lifecycle_v3, [sequence], [
pull_commit,
{group, basic_lifecycle}
]},
@ -79,10 +79,10 @@ end_per_suite(C) ->
genlib_app:stop_unload_applications(?config(suite_apps, C)).
-spec init_per_group(group_name(), config()) -> config().
init_per_group(basic_lifecycle_v1, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v1)} | C];
init_per_group(basic_lifecycle_v2, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v2)} | C];
init_per_group(basic_lifecycle_v3, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v3)} | C];
init_per_group(_, C) ->
C.
@ -90,17 +90,14 @@ start_with_repository(Repository) ->
genlib_app:start_application_with(dmt_api, [
{repository, Repository},
{automaton_service_url, "http://machinegun:8022/v1/automaton"},
{max_cache_size, #{
elements => 1,
memory => 2048 % 2Kb
}}
{max_cache_size, 2048} % 2Kb
]).
-spec end_per_group(group_name(), config()) -> term().
end_per_group(basic_lifecycle_v1, C) ->
genlib_app:stop_unload_applications(?config(group_apps, C));
end_per_group(basic_lifecycle_v2, C) ->
genlib_app:stop_unload_applications(?config(group_apps, C));
end_per_group(basic_lifecycle_v3, C) ->
genlib_app:stop_unload_applications(?config(group_apps, C));
end_per_group(_, _C) ->
ok.