* Use valitydev/thrift compiler 0.14.2.3
* Bump to valitydev/dmt-core@7584133
* Bump to valitydev/machinegun-proto@a411c7d
* Drop repository v4
* Update to valitydev/dmt-client@ce6678a in tests
* Keep v5 migration code and tests as examples only
* Add testcase on `RepositoryClient`
* Add testcases on `Repository` functions exceptions
* Move unused msgpack protocol out to thrift runtime library
This commit is contained in:
Andrew Mayorov 2022-07-15 11:30:18 +03:00 committed by GitHub
parent 50ff8393dd
commit 4b71be54bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 306 additions and 864 deletions

4
.env
View File

@ -2,6 +2,6 @@
# You SHOULD specify point releases here so that build time and run time Erlang/OTPs
# are the same. See: https://github.com/erlware/relx/pull/902
SERVICE_NAME=dominant
OTP_VERSION=24.2.0
OTP_VERSION=24.3.4
REBAR_VERSION=3.18
THRIFT_VERSION=0.14.2.2
THRIFT_VERSION=0.14.2.3

View File

@ -2,7 +2,8 @@
-behaviour(dmt_api_repository).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS, <<"domain-config">>).
@ -35,7 +36,7 @@
-type context() :: woody_context:ctx().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type ref() :: dmsl_domain_conf_thrift:'Reference'().
-type snapshot() :: dmt_api_repository:snapshot().
-type commit() :: dmt_api_repository:commit().
@ -163,9 +164,13 @@ try_migrate_history(Version, History, Context) ->
%% TODO abstraction leak
NextVersion = Version + 1,
case maps:get(NextVersion, History, undefined) of
#'Commit'{} = Commit ->
#domain_conf_Commit{} = Commit ->
MigratedCommit = migrate_commit(Commit),
{ok, #'Snapshot'{version = NextVersion}} = dmt_api_repository_v5:commit(Version, MigratedCommit, Context),
{ok, #domain_conf_Snapshot{version = NextVersion}} = dmt_api_repository_v5:commit(
Version,
MigratedCommit,
Context
),
%% continue history traversing
try_migrate_history(NextVersion, History, Context);
undefined ->
@ -205,20 +210,20 @@ decode_aux_state(
) ->
#{version => Version, is_finished => IsFinished}.
migrate_commit(#'Commit'{ops = Ops} = Commit) ->
migrate_commit(#domain_conf_Commit{ops = Ops} = Commit) ->
UpdatedOps = lists:map(fun rewrite_op/1, Ops),
NewOps = lists:flatmap(fun add_ops/1, Ops),
Commit#'Commit'{ops = UpdatedOps ++ NewOps}.
Commit#domain_conf_Commit{ops = UpdatedOps ++ NewOps}.
rewrite_op({insert, #'InsertOp'{object = Object} = Op}) ->
{insert, Op#'InsertOp'{object = rewrite_object(Object)}};
rewrite_op({update, #'UpdateOp'{old_object = OldObject, new_object = NewObject} = Op}) ->
{update, Op#'UpdateOp'{
rewrite_op({insert, #domain_conf_InsertOp{object = Object} = Op}) ->
{insert, Op#domain_conf_InsertOp{object = rewrite_object(Object)}};
rewrite_op({update, #domain_conf_UpdateOp{old_object = OldObject, new_object = NewObject} = Op}) ->
{update, Op#domain_conf_UpdateOp{
old_object = rewrite_object(OldObject),
new_object = rewrite_object(NewObject)
}};
rewrite_op({remove, #'RemoveOp'{object = Object} = Op}) ->
{remove, Op#'RemoveOp'{object = rewrite_object(Object)}}.
rewrite_op({remove, #domain_conf_RemoveOp{object = Object} = Op}) ->
{remove, Op#domain_conf_RemoveOp{object = rewrite_object(Object)}}.
rewrite_object({provider, #domain_ProviderObject{data = Data} = Object}) ->
NewData = Data#domain_Provider{
@ -255,25 +260,25 @@ rewrite_provider_decision_name(domain_P2PProviderDecision) ->
rewrite_provider_decision_name(Name) ->
Name.
add_ops({insert, #'InsertOp'{object = Object0} = Op}) ->
add_ops({insert, #domain_conf_InsertOp{object = Object0} = Op}) ->
case maybe_clone_object(Object0) of
{add, Object1} ->
[{insert, Op#'InsertOp'{object = Object1}}];
[{insert, Op#domain_conf_InsertOp{object = Object1}}];
ignore ->
[]
end;
add_ops({update, #'UpdateOp'{old_object = OldObject0, new_object = NewObject0} = Op}) ->
add_ops({update, #domain_conf_UpdateOp{old_object = OldObject0, new_object = NewObject0} = Op}) ->
case maybe_clone_object(OldObject0) of
{add, OldObject1} ->
{add, NewObject1} = maybe_clone_object(NewObject0),
[{update, Op#'UpdateOp'{old_object = OldObject1, new_object = NewObject1}}];
[{update, Op#domain_conf_UpdateOp{old_object = OldObject1, new_object = NewObject1}}];
ignore ->
[]
end;
add_ops({remove, #'RemoveOp'{object = Object0} = Op}) ->
add_ops({remove, #domain_conf_RemoveOp{object = Object0} = Op}) ->
case maybe_clone_object(Object0) of
{add, Object1} ->
[{remove, Op#'RemoveOp'{object = Object1}}];
[{remove, Op#domain_conf_RemoveOp{object = Object1}}];
ignore ->
[]
end.

View File

@ -1,9 +1,9 @@
-module(dmt_migration_v5_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-export([all/0]).
@ -410,24 +410,24 @@ next_id() ->
erlang:system_time(micro_seconds) band 16#7FFFFFFF.
insert(Object) ->
dmt_client:commit(#'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}).
dmt_client:commit(#domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object}}]}).
update(Object0, Object1) ->
dmt_client:commit(#'Commit'{
dmt_client:commit(#domain_conf_Commit{
ops = [
{update, #'UpdateOp'{old_object = Object0, new_object = Object1}}
{update, #domain_conf_UpdateOp{old_object = Object0, new_object = Object1}}
]
}).
remove(Object) ->
dmt_client:commit(#'Commit'{ops = [{remove, #'RemoveOp'{object = Object}}]}).
dmt_client:commit(#domain_conf_Commit{ops = [{remove, #domain_conf_RemoveOp{object = Object}}]}).
checkout(Ref, Version) ->
try dmt_client:checkout_object(Version, Ref) of
{_Tag, Object} ->
Object
catch
throw:#'ObjectNotFound'{} ->
throw:#domain_conf_ObjectNotFound{} ->
not_found;
throw:Reason ->
erlang:error(Reason)
@ -484,7 +484,7 @@ wait_for_migration(V, TriesLeft, SleepInterval) when TriesLeft > 0 ->
description = <<"MigrationCommitFixture">>
}
}},
Commit = #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]},
Commit = #domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object}}]},
try
dmt_client:commit(V, Commit)
catch

View File

@ -29,8 +29,8 @@
{genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}},
{woody, {git, "https://github.com/valitydev/woody_erlang.git", {branch, "master"}}},
{damsel, {git, "https://github.com/valitydev/damsel.git", {branch, "master"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun_proto.git", {branch, "master"}}},
{dmt_core, {git, "https://github.com/valitydev/dmt_core.git", {branch, "master"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}},
{dmt_core, {git, "https://github.com/valitydev/dmt-core.git", {branch, "master"}}},
{scoper, {git, "https://github.com/valitydev/scoper.git", {branch, "master"}}},
{erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, master}}}
]}.
@ -55,8 +55,8 @@
% mandatory
unmatched_returns,
error_handling,
race_conditions
% unknown %% need fix
race_conditions,
unknown
]},
{plt_apps, all_deps}
]}.
@ -64,7 +64,10 @@
{profiles, [
{test, [
{deps, [
{dmt_client, {git, "https://github.com/valitydev/dmt_client.git", {ref, "3f664028"}}}
{dmt_client, {git, "https://github.com/valitydev/dmt-client.git", {ref, "ce6678a"}}}
]},
{dialyzer, [
{plt_extra_apps, [dmt_client]}
]}
]},
{prod, [

View File

@ -10,11 +10,11 @@
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},2},
{<<"damsel">>,
{git,"https://github.com/valitydev/damsel.git",
{ref,"b25d3365e1f2b075ffea30b3a2e1c41eb3f6145b"}},
{ref,"9362c08657d1681240d70f923fc04642bbfecc0a"}},
0},
{<<"dmt_core">>,
{git,"https://github.com/valitydev/dmt_core.git",
{ref,"910e20edbe03ae4645aa3923baea8054003753b5"}},
{git,"https://github.com/valitydev/dmt-core.git",
{ref,"75841332fe0b40a77da0c12ea8d5dbb994da8e82"}},
0},
{<<"erl_health">>,
{git,"https://github.com/valitydev/erlang-health.git",
@ -38,8 +38,8 @@
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mg_proto">>,
{git,"https://github.com/rbkmoney/machinegun_proto.git",
{ref,"f77367a05c89162bf1e6c3928d611343aba9717a"}},
{git,"https://github.com/valitydev/machinegun-proto.git",
{ref,"a411c7d5d779389c70d2594eb4a28a916dce1721"}},
0},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2},

View File

@ -62,12 +62,12 @@ get_repository_handlers() ->
{Path :: iodata(), {woody:service(), woody:handler(woody:options())}}.
get_handler_spec(repository, Options) ->
{"/v1/domain/repository", {
{dmsl_domain_config_thrift, 'Repository'},
{dmsl_domain_conf_thrift, 'Repository'},
{dmt_api_repository_handler, Options}
}};
get_handler_spec(repository_client, Options) ->
{"/v1/domain/repository_client", {
{dmsl_domain_config_thrift, 'RepositoryClient'},
{dmsl_domain_conf_thrift, 'RepositoryClient'},
{dmt_api_repository_client_handler, Options}
}};
get_handler_spec(state_processor, Options) ->

View File

@ -2,9 +2,7 @@
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-export([call/4]).
-export([call/5]).
-export([get_machine/3]).
-export([get_history/4]).
-export([start/3]).
@ -17,17 +15,10 @@
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type context() :: woody_context:ctx().
%%
-spec call(ns(), id(), args(), context()) ->
response()
| no_return().
call(NS, ID, Args, Context) ->
call(NS, ID, #'mg_stateproc_HistoryRange'{}, Args, Context).
-spec call(ns(), id(), history_range(), args(), context()) ->
response()
| no_return().
@ -38,20 +29,7 @@ call(NS, ID, HistoryRange, Args, Context) ->
Result;
{error, #'mg_stateproc_MachineNotFound'{}} ->
ok = start(NS, ID, Context),
call(NS, ID, Args, Context)
end.
-spec get_machine(ns(), id(), context()) ->
{ok, machine()}
| {error, mg_proto_state_processing_thrift:'MachineNotFound'()}
| no_return().
get_machine(NS, ID, Context) ->
Descriptor = construct_descriptor(NS, ID, #'mg_stateproc_HistoryRange'{}),
case issue_rpc('GetMachine', {Descriptor}, Context) of
{ok, #'mg_stateproc_Machine'{} = Machine} ->
{ok, Machine};
{error, _} = Error ->
Error
call(NS, ID, HistoryRange, Args, Context)
end.
-spec get_history(ns(), id(), history_range(), context()) ->

View File

@ -24,7 +24,7 @@
-define(TABLE, ?MODULE).
-define(SERVER, ?MODULE).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
%%
@ -67,7 +67,7 @@ init(_) ->
ordered_set,
protected,
{read_concurrency, true},
{keypos, #'Snapshot'.version}
{keypos, #domain_conf_Snapshot.version}
],
?TABLE = ets:new(?TABLE, EtsOpts),
{ok, #state{}}.

View File

@ -1,6 +1,6 @@
-module(dmt_api_repository).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
%% API
@ -15,13 +15,13 @@
-export_type([commit/0]).
-export_type([history/0]).
-type version() :: dmsl_domain_config_thrift:'Version'().
-type limit() :: dmsl_domain_config_thrift:'Limit'() | undefined.
-type snapshot() :: dmsl_domain_config_thrift:'Snapshot'().
-type commit() :: dmsl_domain_config_thrift:'Commit'().
-type history() :: dmsl_domain_config_thrift:'History'().
-type version() :: dmsl_domain_conf_thrift:'Version'().
-type limit() :: dmsl_domain_conf_thrift:'Limit'() | undefined.
-type snapshot() :: dmsl_domain_conf_thrift:'Snapshot'().
-type commit() :: dmsl_domain_conf_thrift:'Commit'().
-type history() :: dmsl_domain_conf_thrift:'History'().
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type ref() :: dmsl_domain_conf_thrift:'Reference'().
-type object_ref() :: dmsl_domain_thrift:'Reference'().
-type repository() :: module().
-type context() :: woody_context:ctx().
@ -44,7 +44,7 @@
%%
-spec checkout(ref(), repository(), context()) -> {ok, snapshot()} | {error, version_not_found}.
checkout({head, #'Head'{}} = V, Repository, Context) ->
checkout({head, #domain_conf_Head{}} = V, Repository, Context) ->
case Repository:checkout(V, Context) of
{ok, Snapshot} ->
{ok, dmt_api_cache:put(Snapshot)};
@ -65,7 +65,7 @@ checkout({version, Version} = V, Repository, Context) ->
end.
-spec checkout_object(ref(), object_ref(), repository(), context()) ->
{ok, dmsl_domain_config_thrift:'VersionedObject'()} | {error, version_not_found | object_not_found}.
{ok, dmsl_domain_conf_thrift:'VersionedObject'()} | {error, version_not_found | object_not_found}.
checkout_object(Reference, ObjectReference, Repository, Context) ->
case checkout(Reference, Repository, Context) of
{ok, Snapshot} ->
@ -84,7 +84,7 @@ pull(Version, Limit, Repository, Context) ->
commit(Version, Commit, Repository, Context) ->
case Repository:commit(Version, Commit, Context) of
{ok, Snapshot} ->
#'Snapshot'{version = VersionNext} = dmt_api_cache:put(Snapshot),
#domain_conf_Snapshot{version = VersionNext} = dmt_api_cache:put(Snapshot),
{ok, VersionNext};
{error, _} = Error ->
Error
@ -93,11 +93,11 @@ commit(Version, Commit, Repository, Context) ->
%% Internal
-spec try_get_object(object_ref(), snapshot()) ->
{ok, dmsl_domain_config_thrift:'VersionedObject'()} | {error, object_not_found}.
try_get_object(ObjectReference, #'Snapshot'{version = Version, domain = Domain}) ->
{ok, dmsl_domain_conf_thrift:'VersionedObject'()} | {error, object_not_found}.
try_get_object(ObjectReference, #domain_conf_Snapshot{version = Version, domain = Domain}) ->
case dmt_domain:get_object(ObjectReference, Domain) of
{ok, Object} ->
{ok, #'VersionedObject'{version = Version, object = Object}};
{ok, #domain_conf_VersionedObject{version = Version, object = Object}};
error ->
{error, object_not_found}
end.

View File

@ -2,7 +2,7 @@
-behaviour(woody_server_thrift_handler).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-export([handle_function/4]).
@ -18,7 +18,7 @@
-type context() :: woody_context:ctx().
-spec handle_function('checkoutObject', woody:args(), context(), options()) ->
{ok, dmsl_domain_config_thrift:'VersionedObject'()} | no_return().
{ok, dmsl_domain_conf_thrift:'VersionedObject'()} | no_return().
handle_function('checkoutObject', {Reference, ObjectReference}, Context0, Options) ->
DefaultDeadline = woody_deadline:from_timeout(default_handling_timeout(Options)),
Context = dmt_api_woody_utils:ensure_woody_deadline_set(Context0, DefaultDeadline),
@ -26,9 +26,9 @@ handle_function('checkoutObject', {Reference, ObjectReference}, Context0, Option
{ok, Object} ->
{ok, Object};
{error, object_not_found} ->
woody_error:raise(business, #'ObjectNotFound'{});
woody_error:raise(business, #domain_conf_ObjectNotFound{});
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})
woody_error:raise(business, #domain_conf_VersionNotFound{})
end.
%% Internals

View File

@ -2,7 +2,7 @@
-behaviour(woody_server_thrift_handler).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-export([handle_function/4]).
@ -39,9 +39,9 @@ do_handle_function('Commit', {Version, Commit}, Context, Options) ->
{error, {operation_error, Error}} ->
woody_error:raise(business, handle_operation_error(Error));
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{});
woody_error:raise(business, #domain_conf_VersionNotFound{});
{error, head_mismatch} ->
woody_error:raise(business, #'ObsoleteCommitVersion'{});
woody_error:raise(business, #domain_conf_ObsoleteCommitVersion{});
{error, migration_in_progress} ->
woody_error:raise(system, {internal, resource_unavailable, <<"Migration in progress. Please, stand by.">>})
end;
@ -50,14 +50,14 @@ do_handle_function('Checkout', {Reference}, Context, Options) ->
{ok, Snapshot} ->
{ok, Snapshot};
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})
woody_error:raise(business, #domain_conf_VersionNotFound{})
end;
do_handle_function('PullRange', {After, Limit}, Context, Options) ->
case dmt_api_repository:pull(After, Limit, repository(Options), Context) of
{ok, History} ->
{ok, History};
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})
woody_error:raise(business, #domain_conf_VersionNotFound{})
end;
%% depreceted, will be removed soon
do_handle_function('Pull', {Version}, Context, Options) ->
@ -65,34 +65,34 @@ do_handle_function('Pull', {Version}, Context, Options) ->
{ok, History} ->
{ok, History};
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})
woody_error:raise(business, #domain_conf_VersionNotFound{})
end.
%%
handle_operation_error({conflict, Conflict}) ->
#'OperationConflict'{
#domain_conf_OperationConflict{
conflict = handle_operation_conflict(Conflict)
};
handle_operation_error({invalid, Invalid}) ->
#'OperationInvalid'{
#domain_conf_OperationInvalid{
errors = handle_operation_invalid(Invalid)
}.
handle_operation_conflict(Conflict) ->
case Conflict of
{object_already_exists, Ref} ->
{object_already_exists, #'ObjectAlreadyExistsConflict'{object_ref = Ref}};
{object_already_exists, #domain_conf_ObjectAlreadyExistsConflict{object_ref = Ref}};
{object_not_found, Ref} ->
{object_not_found, #'ObjectNotFoundConflict'{object_ref = Ref}};
{object_not_found, #domain_conf_ObjectNotFoundConflict{object_ref = Ref}};
{object_reference_mismatch, Ref} ->
{object_reference_mismatch, #'ObjectReferenceMismatchConflict'{object_ref = Ref}}
{object_reference_mismatch, #domain_conf_ObjectReferenceMismatchConflict{object_ref = Ref}}
end.
handle_operation_invalid(Invalid) ->
case Invalid of
{objects_not_exist, Refs} ->
[
{object_not_exists, #'NonexistantObject'{
{object_not_exists, #domain_conf_NonexistantObject{
object_ref = Ref,
referenced_by = ReferencedBy
}}
@ -100,7 +100,7 @@ handle_operation_invalid(Invalid) ->
];
{object_reference_cycles, Cycles} ->
[
{object_reference_cycle, #'ObjectReferenceCycle'{cycle = Cycle}}
{object_reference_cycle, #domain_conf_ObjectReferenceCycle{cycle = Cycle}}
|| Cycle <- Cycles
]
end.

View File

@ -1,292 +0,0 @@
-module(dmt_api_repository_v4).
-behaviour(dmt_api_repository).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS, <<"domain-config">>).
-define(ID, <<"primary/v4">>).
-define(BASE, 10).
%% API
-export([checkout/2]).
-export([pull/2]).
-export([pull/3]).
-export([commit/3]).
%% State processor
-behaviour(dmt_api_automaton_handler).
-export([process_call/3]).
-export([process_signal/3]).
%%
-record(st, {
snapshot = #'Snapshot'{version = 0, domain = dmt_domain:new()} :: snapshot(),
history = #{} :: dmt_api_repository:history()
}).
-type st() :: #st{}.
-type context() :: woody_context:ctx().
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type snapshot() :: dmt_api_repository:snapshot().
-type commit() :: dmt_api_repository:commit().
-spec checkout(ref(), context()) ->
{ok, snapshot()}
| {error, version_not_found}.
checkout({head, #'Head'{}}, Context) ->
HistoryRange = #mg_stateproc_HistoryRange{
'after' = undefined,
'limit' = ?BASE,
'direction' = backward
},
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
squash_state(St);
{error, version_not_found} ->
{error, version_not_found}
end;
checkout({version, V}, Context) ->
BaseV = get_base_version(V),
HistoryRange = #mg_stateproc_HistoryRange{
'after' = get_event_id(BaseV),
'limit' = V - BaseV,
'direction' = forward
},
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
case squash_state(St) of
{ok, #'Snapshot'{version = V}} = Result ->
Result;
{ok, _} ->
{error, version_not_found}
end;
{error, version_not_found} ->
{error, version_not_found}
end.
-spec pull(dmt_api_repository:version(), context()) ->
{ok, dmt_api_repository:history()}
| {error, version_not_found}.
pull(Version, Context) ->
pull(Version, undefined, Context).
-spec pull(dmt_api_repository:version(), dmt_api_repository:limit(), context()) ->
{ok, dmt_api_repository:history()}
| {error, version_not_found}.
pull(Version, Limit, Context) ->
After = get_event_id(Version),
case get_history_by_range(#mg_stateproc_HistoryRange{'after' = After, 'limit' = Limit}, Context) of
#st{history = History} ->
{ok, History};
{error, version_not_found} ->
{error, version_not_found}
end.
-spec commit(dmt_api_repository:version(), commit(), context()) ->
{ok, snapshot()}
| {error, version_not_found | {operation_error, dmt_domain:operation_error()}}.
commit(Version, Commit, Context) ->
BaseID = get_event_id(get_base_version(Version)),
decode_call_result(
dmt_api_automaton_client:call(
?NS,
?ID,
%% TODO in theory, it's enought ?BASE + 1 events here,
%% but it's complicated and needs to be covered by tests
#mg_stateproc_HistoryRange{'after' = BaseID},
encode_call({commit, Version, Commit}),
Context
)
).
%%
-spec get_history_by_range(history_range(), context()) -> st() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #mg_stateproc_MachineNotFound{}} ->
ok = dmt_api_automaton_client:start(?NS, ?ID, Context),
get_history_by_range(HistoryRange, Context);
{error, #mg_stateproc_EventNotFound{}} ->
{error, version_not_found}
end.
%%
-spec process_call(dmt_api_automaton_handler:call(), machine(), context()) ->
{dmt_api_automaton_handler:response(), dmt_api_automaton_handler:events()} | no_return().
process_call(Call, #mg_stateproc_Machine{ns = ?NS, id = ?ID} = Machine, Context) ->
Args = decode_call(Call),
{Result, Events} = handle_call(Args, read_history(Machine), Context),
{encode_call_result(Result), encode_events(Events)};
process_call(_Call, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) ->
Message = <<"Unknown machine '", NS/binary, "' '", ID/binary, "'">>,
woody_error:raise(system, {internal, resource_unavailable, Message}).
-spec process_signal(dmt_api_automaton_handler:signal(), machine(), context()) ->
{dmt_api_automaton_handler:action(), dmt_api_automaton_handler:events()} | no_return().
process_signal({init, #mg_stateproc_InitSignal{}}, #mg_stateproc_Machine{ns = ?NS, id = ?ID}, _Context) ->
{#mg_stateproc_ComplexAction{}, []};
process_signal({timeout, #mg_stateproc_TimeoutSignal{}}, #mg_stateproc_Machine{ns = ?NS, id = ?ID}, _Context) ->
{#mg_stateproc_ComplexAction{}, []};
process_signal(_Signal, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) ->
Message = <<"Unknown machine '", NS/binary, "' '", ID/binary, "'">>,
woody_error:raise(system, {internal, resource_unavailable, Message}).
%%
handle_call({commit, Version, Commit}, St, _Context) ->
case squash_state(St) of
{ok, #'Snapshot'{version = Version} = Snapshot} ->
apply_commit(Snapshot, Commit);
{ok, #'Snapshot'{version = V}} when V > Version ->
% Is this retry? Maybe we already applied this commit.
check_commit(Version, Commit, St);
{ok, _} ->
{{error, head_mismatch}, []}
end.
apply_commit(#'Snapshot'{version = VersionWas, domain = DomainWas}, #'Commit'{ops = Ops} = Commit) ->
case dmt_domain:apply_operations(Ops, DomainWas) of
{ok, Domain} ->
Snapshot = #'Snapshot'{version = VersionWas + 1, domain = Domain},
{{ok, Snapshot}, [make_event(Snapshot, Commit)]};
{error, Reason} ->
{{error, {operation_error, Reason}}, []}
end.
check_commit(Version, Commit, #st{snapshot = BaseSnapshot, history = History}) ->
case maps:get(Version + 1, History) of
Commit ->
% it's ok, commit alredy applied, lets return this snapshot
{dmt_history:travel(Version + 1, History, BaseSnapshot), []};
_ ->
{{error, head_mismatch}, []}
end.
-spec read_history(machine() | history()) -> st().
read_history(#mg_stateproc_Machine{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #st{}).
-spec read_history([mg_proto_state_processing_thrift:'Event'()], st()) -> st().
read_history([], St) ->
St;
read_history(
[#mg_stateproc_Event{id = Id, data = EventData, format_version = FmtVsn} | Rest],
#st{history = History} = St
) ->
{commit, Commit, Meta} = decode_event(FmtVsn, EventData),
case Meta of
#{snapshot := Snapshot} ->
read_history(
Rest,
St#st{snapshot = Snapshot, history = History#{Id => Commit}}
);
#{} ->
read_history(
Rest,
St#st{history = History#{Id => Commit}}
)
end.
squash_state(#st{snapshot = BaseSnapshot, history = History}) ->
case dmt_history:head(History, BaseSnapshot) of
{ok, Snapshot} ->
{ok, Snapshot};
{error, Error} ->
error(Error)
end.
%%
make_event(Snapshot, Commit) ->
Meta =
case (Snapshot#'Snapshot'.version) rem ?BASE of
0 ->
#{snapshot => Snapshot};
_ ->
#{}
end,
{commit, Commit, Meta}.
encode_events(Events) ->
FmtVsn = 1,
encode_events(FmtVsn, Events).
encode_events(FmtVsn, Events) ->
[encode_event(FmtVsn, E) || E <- Events].
encode_event(FmtVsn, Data) ->
#mg_stateproc_Content{format_version = FmtVsn, data = encode_event_data(FmtVsn, Data)}.
encode_event_data(1 = FmtVsn, {commit, Commit, Meta}) ->
{arr, [{str, <<"commit">>}, encode(commit, Commit), encode_commit_meta(FmtVsn, Meta)]}.
encode_commit_meta(1, #{snapshot := Snapshot}) ->
{obj, #{{str, <<"snapshot">>} => encode(snapshot, Snapshot)}};
encode_commit_meta(1, #{}) ->
{obj, #{}}.
decode_event(undefined, Data) ->
decode_event(1, Data);
decode_event(1 = FmtVsn, {arr, [{str, <<"commit">>}, Commit, Meta]}) ->
{commit, decode(commit, Commit), decode_commit_meta(FmtVsn, Meta)}.
decode_commit_meta(1, {obj, #{{str, <<"snapshot">>} := Snapshot}}) ->
#{snapshot => decode(snapshot, Snapshot)};
decode_commit_meta(1, {obj, #{}}) ->
#{}.
%%
encode_call({commit, Version, Commit}) ->
{arr, [{str, <<"commit">>}, {i, Version}, encode(commit, Commit)]}.
decode_call({arr, [{str, <<"commit">>}, {i, Version}, Commit]}) ->
{commit, Version, decode(commit, Commit)}.
encode_call_result({ok, Snapshot}) ->
{arr, [{str, <<"ok">>}, encode(snapshot, Snapshot)]};
encode_call_result({error, Reason}) ->
{arr, [{str, <<"err">>}, {bin, term_to_binary(Reason)}]}.
decode_call_result({arr, [{str, <<"ok">>}, Snapshot]}) ->
{ok, decode(snapshot, Snapshot)};
decode_call_result({arr, [{str, <<"err">>}, {bin, Reason}]}) ->
{error, binary_to_term(Reason)}.
%%
encode(T, V) ->
{bin, dmt_api_thrift_utils:encode(binary, get_type_info(T), V)}.
decode(T, {bin, V}) ->
dmt_api_thrift_utils:decode(binary, get_type_info(T), V).
get_type_info(commit) ->
{struct, struct, {dmsl_domain_config_thrift, 'Commit'}};
get_type_info(snapshot) ->
{struct, struct, {dmsl_domain_config_thrift, 'Snapshot'}}.
get_base_version(V) when is_integer(V) andalso V >= ?BASE ->
(V div ?BASE) * ?BASE - 1;
get_base_version(V) when is_integer(V) ->
0.
get_event_id(ID) when is_integer(ID) andalso ID > 0 ->
ID;
get_event_id(0) ->
undefined.

View File

@ -2,7 +2,7 @@
-behaviour(dmt_api_repository).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS, <<"domain-config">>).
@ -25,7 +25,7 @@
%%
-record(st, {
snapshot = #'Snapshot'{version = 0, domain = dmt_domain:new()} :: snapshot(),
snapshot = #domain_conf_Snapshot{version = 0, domain = dmt_domain:new()} :: snapshot(),
history = #{} :: dmt_api_repository:history()
}).
@ -35,14 +35,14 @@
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type ref() :: dmsl_domain_config_thrift:'Reference'().
-type ref() :: dmsl_domain_conf_thrift:'Reference'().
-type snapshot() :: dmt_api_repository:snapshot().
-type commit() :: dmt_api_repository:commit().
-spec checkout(ref(), context()) ->
{ok, snapshot()}
| {error, version_not_found}.
checkout({head, #'Head'{}}, Context) ->
checkout({head, #domain_conf_Head{}}, Context) ->
HistoryRange = #mg_stateproc_HistoryRange{
'after' = undefined,
'limit' = ?BASE,
@ -64,7 +64,7 @@ checkout({version, V}, Context) ->
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
case squash_state(St) of
{ok, #'Snapshot'{version = V}} = Result ->
{ok, #domain_conf_Snapshot{version = V}} = Result ->
Result;
{ok, _} ->
{error, version_not_found}
@ -148,19 +148,22 @@ process_signal(_Signal, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) ->
handle_call({commit, Version, Commit}, St, _Context) ->
case squash_state(St) of
{ok, #'Snapshot'{version = Version} = Snapshot} ->
{ok, #domain_conf_Snapshot{version = Version} = Snapshot} ->
apply_commit(Snapshot, Commit);
{ok, #'Snapshot'{version = V}} when V > Version ->
{ok, #domain_conf_Snapshot{version = V}} when V > Version ->
% Is this retry? Maybe we already applied this commit.
check_commit(Version, Commit, St);
{ok, _} ->
{{error, head_mismatch}, []}
{{error, version_not_found}, []}
end.
apply_commit(#'Snapshot'{version = VersionWas, domain = DomainWas}, #'Commit'{ops = Ops} = Commit) ->
apply_commit(
#domain_conf_Snapshot{version = VersionWas, domain = DomainWas},
#domain_conf_Commit{ops = Ops} = Commit
) ->
case dmt_domain:apply_operations(Ops, DomainWas) of
{ok, Domain} ->
Snapshot = #'Snapshot'{version = VersionWas + 1, domain = Domain},
Snapshot = #domain_conf_Snapshot{version = VersionWas + 1, domain = Domain},
{{ok, Snapshot}, [make_event(Snapshot, Commit)]};
{error, Reason} ->
{{error, {operation_error, Reason}}, []}
@ -214,7 +217,7 @@ squash_state(#st{snapshot = BaseSnapshot, history = History}) ->
make_event(Snapshot, Commit) ->
Meta =
case (Snapshot#'Snapshot'.version) rem ?BASE of
case (Snapshot#domain_conf_Snapshot.version) rem ?BASE of
0 ->
#{snapshot => Snapshot};
_ ->
@ -275,9 +278,9 @@ decode(T, {bin, V}) ->
dmt_api_thrift_utils:decode(binary, get_type_info(T), V).
get_type_info(commit) ->
{struct, struct, {dmsl_domain_config_thrift, 'Commit'}};
{struct, struct, {dmsl_domain_conf_thrift, 'Commit'}};
get_type_info(snapshot) ->
{struct, struct, {dmsl_domain_config_thrift, 'Snapshot'}}.
{struct, struct, {dmsl_domain_conf_thrift, 'Snapshot'}}.
get_base_version(V) when is_integer(V) andalso V >= ?BASE ->
(V div ?BASE) * ?BASE - 1;

View File

@ -1,334 +0,0 @@
-module(dmt_api_thrift_msgpack_protocol).
%%% Thrift library uses legacy behaviour definition that makes dialyzer angry.
%%% -behaviour(thrift_protocol).
-include_lib("thrift/include/thrift_constants.hrl").
-include_lib("thrift/include/thrift_protocol.hrl").
-export([
new/0,
new/1,
read/2,
write/2,
flush_transport/1,
close_transport/1
]).
-record(msgpack_protocol, {
production :: production()
}).
-type production() ::
[]
| value()
| {array, [production()], value()}
| {map, none | value(), [{value(), value()}] | #{value() => value()}, value()}.
-type value() :: dmsl_msgpack_thrift:'Value'().
-type state() :: #msgpack_protocol{}.
-include_lib("thrift/include/thrift_protocol_behaviour.hrl").
% FIXME
-spec new() -> term().
new() ->
new([]).
% FIXME
-spec new(production()) -> term().
new(Production) ->
State = #msgpack_protocol{production = Production},
thrift_protocol:new(?MODULE, State).
flush_transport(This) ->
{This, ok}.
close_transport(This = #msgpack_protocol{production = Production}) ->
{This, {ok, Production}}.
%%%
%%% instance methods
%%%
typeid_to_string(?tType_BOOL) -> <<"bool">>;
typeid_to_string(?tType_DOUBLE) -> <<"dbl">>;
typeid_to_string(?tType_I8) -> <<"i8">>;
typeid_to_string(?tType_I16) -> <<"i16">>;
typeid_to_string(?tType_I32) -> <<"i32">>;
typeid_to_string(?tType_I64) -> <<"i64">>;
typeid_to_string(?tType_STRING) -> <<"str">>;
typeid_to_string(?tType_STRUCT) -> <<"struct">>;
typeid_to_string(?tType_MAP) -> <<"map">>;
typeid_to_string(?tType_SET) -> <<"set">>;
typeid_to_string(?tType_LIST) -> <<"list">>.
-define(str(V), {str, V}).
-define(int(V), {i, V}).
-define(flt(V), {flt, V}).
-define(bool(V), {b, V}).
write(This, #protocol_message_begin{name = Name, type = Type, seqid = Seqid}) ->
do_write_many(This, [
{enter, array},
?str(Name),
?int(Type),
?int(Seqid)
]);
write(This, message_end) ->
do_write(This, {exit, array});
write(This, #protocol_struct_begin{}) ->
do_write(This, {enter, map});
write(This, struct_end) ->
{This, ok};
write(This, #protocol_field_begin{name = Name, type = Type, id = Id}) ->
do_write_many(This, [
?str(genlib:to_binary(Name)),
{enter, array},
?int(Id),
?str(typeid_to_string(Type))
]);
write(This, field_end) ->
do_write(This, {exit, array});
write(This, field_stop) ->
do_write(This, {exit, map});
write(This, #protocol_map_begin{ktype = Ktype, vtype = Vtype, size = Size}) ->
do_write_many(This, [
{enter, array},
{enter, map},
?str(typeid_to_string(Ktype)),
?str(typeid_to_string(Vtype)),
{exit, map},
?int(Size),
{enter, map}
]);
write(This, map_end) ->
do_write_many(This, [
{exit, map},
{exit, array}
]);
write(This, #protocol_list_begin{etype = Etype, size = Size}) ->
do_write_many(This, [
{enter, array},
?str(typeid_to_string(Etype)),
?int(Size)
]);
write(This, list_end) ->
do_write(This, {exit, array});
write(This, #protocol_set_begin{etype = Etype, size = Size}) ->
write(This, #protocol_list_begin{etype = Etype, size = Size});
write(This, set_end) ->
write(This, list_end);
write(This, {bool, V}) ->
do_write(This, ?bool(V));
write(This, {byte, Byte}) ->
do_write(This, ?int(Byte));
write(This, {i16, I16}) ->
do_write(This, ?int(I16));
write(This, {i32, I32}) ->
do_write(This, ?int(I32));
write(This, {i64, I64}) ->
do_write(This, ?int(I64));
write(This, {double, Double}) ->
do_write(This, ?flt(Double));
write(This, {string, Bin}) when is_binary(Bin) ->
% FIXME nonprintable?
do_write(This, ?str(Bin)).
%%
do_write_many(#msgpack_protocol{production = Production}, Vs) ->
{#msgpack_protocol{production = write_vals(Production, Vs)}, ok}.
do_write(#msgpack_protocol{production = Production}, V) ->
{#msgpack_protocol{production = write_val(Production, V)}, ok}.
write_vals(P0, [V | Vs]) ->
write_vals(write_val(P0, V), Vs);
write_vals(P0, []) ->
P0.
write_val(Production, {enter, array}) ->
{array, [], Production};
write_val({array, Vs, Production}, {exit, array}) ->
write_val(Production, {arr, lists:reverse(Vs)});
write_val(Production, {enter, map}) ->
{map, none, #{}, Production};
write_val({map, none, Vs, Production}, {exit, map}) ->
write_val(Production, {obj, Vs});
write_val({array, Vs, Production}, V) ->
{array, [V | Vs], Production};
write_val({map, none, Vs, Production}, K) ->
{map, K, Vs, Production};
write_val({map, K, Vs, Production}, V) ->
{map, none, Vs#{K => V}, Production};
write_val([], V) ->
V.
%%
string_to_typeid(<<"bool">>) -> ?tType_BOOL;
string_to_typeid(<<"dbl">>) -> ?tType_DOUBLE;
string_to_typeid(<<"i8">>) -> ?tType_I8;
string_to_typeid(<<"i16">>) -> ?tType_I16;
string_to_typeid(<<"i32">>) -> ?tType_I32;
string_to_typeid(<<"i64">>) -> ?tType_I64;
string_to_typeid(<<"str">>) -> ?tType_STRING;
string_to_typeid(<<"struct">>) -> ?tType_STRUCT;
string_to_typeid(<<"map">>) -> ?tType_MAP;
string_to_typeid(<<"set">>) -> ?tType_SET;
string_to_typeid(<<"list">>) -> ?tType_LIST.
read(This, message_begin) ->
{This1, {ok, [Name, Type, SeqId]}} = do_read_many(This, [
{enter, array},
str,
int,
int
]),
{This1, #protocol_message_begin{name = binary_to_list(Name), type = Type, seqid = SeqId}};
read(This, message_end) ->
do_read(This, {exit, array});
read(This, struct_begin) ->
do_read(This, {enter, map});
read(This, struct_end) ->
do_read(This, {exit, map});
read(This0, field_begin) ->
case do_read(This0, str) of
{This1, {ok, Name}} ->
{This2, {ok, [Id, Type]}} = do_read_many(This1, [{enter, array}, int, str]),
{This2, #protocol_field_begin{type = string_to_typeid(Type), id = Id, name = Name}};
{This1, {error, object_exhausted}} ->
{This1, #protocol_field_begin{type = ?tType_STOP}}
end;
read(This0, field_end) ->
do_read(This0, {exit, array});
read(This0, map_begin) ->
{This1, {ok, [Ktype, Vtype, Size]}} = do_read_many(This0, [
{enter, array},
{enter, map},
str,
str,
{exit, map},
int,
{enter, map}
]),
{This1, #protocol_map_begin{
ktype = string_to_typeid(Ktype),
vtype = string_to_typeid(Vtype),
size = Size
}};
read(This, map_end) ->
do_read_many(This, [{exit, map}, {exit, array}]);
read(This0, list_begin) ->
{This1, {ok, [Etype, Size]}} = do_read_many(This0, [
{enter, array},
str,
int
]),
{This1, #protocol_list_begin{
etype = string_to_typeid(Etype),
size = Size
}};
read(This0, list_end) ->
do_read(This0, {exit, array});
read(This0, set_begin) ->
{This1, #protocol_list_begin{etype = Etype, size = Size}} = read(This0, list_begin),
{This1, #protocol_set_begin{etype = Etype, size = Size}};
read(This0, set_end) ->
read(This0, list_end);
read(This0, field_stop) ->
{This0, ok};
read(This0, bool) ->
do_read(This0, bool);
read(This0, byte) ->
do_read(This0, int);
read(This0, i16) ->
do_read(This0, int);
read(This0, i32) ->
do_read(This0, int);
read(This0, i64) ->
do_read(This0, int);
read(This0, double) ->
do_read(This0, flt);
read(This0, string) ->
do_read(This0, str).
%%
do_read_many(This = #msgpack_protocol{production = P0}, Ts) ->
{P1, Result} = do_read_many(P0, Ts, []),
{This#msgpack_protocol{production = P1}, Result}.
do_read_many(P0, [T | Ts], Vs) ->
case read_val(P0, T) of
{P1, ok} ->
do_read_many(P1, Ts, Vs);
{P1, {ok, V}} ->
do_read_many(P1, Ts, [V | Vs]);
{_P, {error, _}} = Result ->
Result
end;
do_read_many(P0, [], []) ->
{P0, ok};
do_read_many(P0, [], Vs) ->
{P0, {ok, lists:reverse(Vs)}}.
do_read(#msgpack_protocol{production = P0}, T) ->
{P1, R} = read_val(P0, T),
{#msgpack_protocol{production = P1}, R}.
read_val(Production, {enter, array}) ->
case read_next(Production) of
{Rest, {arr, Vs}} ->
{{array, Vs, Rest}, ok};
{_Rest, {error, _} = Error} ->
{Production, Error};
{_Rest, {T, _V}} ->
{Production, {error, {array_expected, T}}}
end;
read_val({array, [], Rest}, {exit, array}) ->
{Rest, ok};
read_val(Production, {enter, map}) ->
case read_next(Production) of
{Rest, {obj, Vs}} ->
{{map, none, maps:to_list(Vs), Rest}, ok};
{_Rest, {error, _} = Error} ->
{Production, Error};
{_Rest, {T, _V}} ->
{Production, {error, {object_expected, T}}}
end;
read_val({map, none, [], Rest}, {exit, map}) ->
{Rest, ok};
read_val(P0, T) ->
case read_next(P0) of
{P1, {error, _} = Error} ->
{P1, Error};
{P1, V} ->
{P1, read_scalar(V, T)}
end.
read_next({array, [V | Vs], Rest}) ->
{{array, Vs, Rest}, V};
read_next({array, [], _} = Production) ->
{Production, {error, array_exhausted}};
read_next({map, none, [{K, V} | Vs], Rest}) ->
{{map, V, Vs, Rest}, K};
read_next({map, none, [], _} = Production) ->
{Production, {error, object_exhausted}};
read_next({map, V, Vs, Rest}) ->
{{map, none, Vs, Rest}, V};
read_next(V) ->
{[], V}.
read_scalar(?bool(V), bool) ->
{ok, V};
read_scalar(?str(V), str) ->
{ok, V};
read_scalar(?int(V), int) ->
{ok, V};
read_scalar(?flt(V), flt) ->
{ok, V};
read_scalar(V, T) ->
{error, {type_mismatch, V, T}}.

View File

@ -6,43 +6,15 @@
-type thrift_type() :: term().
-type thrift_value() :: term().
-spec encode
(binary, thrift_type(), thrift_value()) -> binary();
(msgpack, thrift_type(), thrift_value()) -> dmsl_msgpack_thrift:'Value'().
-spec encode(binary, thrift_type(), thrift_value()) -> binary().
encode(binary, Type, Value) ->
Codec0 = thrift_strict_binary_codec:new(),
{ok, Codec} = thrift_strict_binary_codec:write(Codec0, Type, Value),
Data = thrift_strict_binary_codec:close(Codec),
Data;
encode(Proto, Type, Value) ->
{ok, Proto0} = new_protocol(Proto),
{Proto1, ok} = thrift_protocol:write(Proto0, {Type, Value}),
{_Proto, {ok, Data}} = thrift_protocol:close_transport(Proto1),
Data.
thrift_strict_binary_codec:close(Codec).
-spec decode
(binary, thrift_type(), binary()) -> thrift_value();
(msgpack, thrift_type(), dmsl_msgpack_thrift:'Value'()) -> thrift_value().
-spec decode(binary, thrift_type(), binary()) -> thrift_value().
decode(binary, Type, Data) ->
Codec = thrift_strict_binary_codec:new(Data),
{ok, Value, Leftovers} = thrift_strict_binary_codec:read(Codec, Type),
<<>> = thrift_strict_binary_codec:close(Leftovers),
Value;
decode(Proto, Type, Data) ->
{ok, Proto0} = new_protocol(Proto, Data),
{_Proto, {ok, Value}} = thrift_protocol:read(Proto0, Type),
Value.
%%
new_protocol(binary) ->
{ok, Trans} = thrift_membuffer_transport:new(),
thrift_binary_protocol:new(Trans, [{strict_read, true}, {strict_write, true}]);
new_protocol(msgpack) ->
dmt_api_thrift_msgpack_protocol:new().
new_protocol(binary, Data) ->
{ok, Trans} = thrift_membuffer_transport:new(Data),
thrift_binary_protocol:new(Trans);
new_protocol(msgpack, Data) ->
dmt_api_thrift_msgpack_protocol:new(Data).

View File

@ -1,6 +1,5 @@
-module(dmt_api_tests_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-export([all/0]).
@ -17,12 +16,18 @@
-export([insert/1]).
-export([update/1]).
-export([delete/1]).
-export([migration_success/1]).
-export([conflict/1]).
-export([missing_version/1]).
-export([obsolete/1]).
-export([conflict_notfound/1]).
-export([conflict_exists/1]).
-export([conflict_mismatch/1]).
-export([nonexistent/1]).
-export([reference_cycles/1]).
-include_lib("damsel/include/dmsl_domain_config_thrift.hrl").
-export([checkout_object/1]).
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
%% tests descriptions
@ -38,20 +43,13 @@
-spec all() -> [{group, group_name()}].
all() ->
[
{group, basic_lifecycle_v4},
{group, migration_to_v5},
{group, basic_lifecycle_v5}
{group, basic_lifecycle_v5},
{group, repository_client}
].
-spec groups() -> [{group_name(), list(), [test_case_name()]}].
groups() ->
[
{basic_lifecycle_v4, [sequence], [
pull_commit,
{group, basic_lifecycle},
{group, error_mapping},
retry_commit
]},
{basic_lifecycle_v5, [sequence], [
pull_commit,
{group, basic_lifecycle},
@ -63,13 +61,17 @@ groups() ->
update,
delete
]},
{migration_to_v5, [sequence], [
migration_success
]},
{error_mapping, [parallel], [
conflict,
{error_mapping, [], [
missing_version,
obsolete,
conflict_notfound,
conflict_exists,
conflict_mismatch,
nonexistent,
reference_cycles
]},
{repository_client, [parallel], [
checkout_object
]}
].
@ -87,26 +89,10 @@ end_per_suite(C) ->
genlib_app:stop_unload_applications(?config(suite_apps, C)).
-spec init_per_group(group_name(), config()) -> config().
init_per_group(basic_lifecycle_v4, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v4) ++ start_client()} | C];
init_per_group(basic_lifecycle_v5, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v5) ++ start_client()} | C];
init_per_group(migration_to_v5, C) ->
ApiApps = genlib_app:start_application_with(dmt_api, [
{repository, dmt_api_repository_migration},
{migration, #{
timeout => 360,
limit => 20
}},
{services, #{
automaton => #{
url => "http://machinegun:8022/v1/automaton"
}
}},
% 2Kb
{max_cache_size, 2048}
]),
[{group_apps, ApiApps ++ start_client()} | C];
init_per_group(repository_client, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v5) ++ start_client()} | C];
init_per_group(_, C) ->
C.
@ -118,8 +104,8 @@ start_with_repository(Repository) ->
url => "http://machinegun:8022/v1/automaton"
}
}},
% 50Mb
{max_cache_size, 52428800}
% 100Kb
{max_cache_size, 102400}
]).
start_client() ->
@ -139,10 +125,9 @@ start_client() ->
]).
-spec end_per_group(group_name(), config()) -> term().
end_per_group(Group, C) when
Group =:= basic_lifecycle_v4 orelse
Group =:= basic_lifecycle_v5 orelse
Group =:= migration_to_v5
end_per_group(GroupName, C) when
GroupName == basic_lifecycle_v5;
GroupName == repository_client
->
genlib_app:stop_unload_applications(?config(group_apps, C));
end_per_group(_, _C) ->
@ -151,7 +136,7 @@ end_per_group(_, _C) ->
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(_, C) ->
%% added because dmt_client:checkout(latest)
%% could return old version from cache overwise
%% could return old version from cache otherwise
{ok, _Version} = dmt_client_cache:update(),
C.
@ -167,12 +152,15 @@ insert(_C) ->
ID = next_id(),
Object = fixture_domain_object(ID, <<"InsertFixture">>),
Ref = fixture_object_ref(ID),
#'ObjectNotFound'{} = (catch dmt_client:checkout_object(Ref)),
#'Snapshot'{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client:commit(Version1, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
#domain_conf_ObjectNotFound{} = (catch dmt_client:checkout_object(Ref)),
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client:commit(
Version1,
#domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object}}]}
),
_ = dmt_client_cache:update(),
Object = dmt_client:checkout_object(Ref),
#'ObjectNotFound'{} = (catch dmt_client:checkout_object(Version1, Ref)),
#domain_conf_ObjectNotFound{} = (catch dmt_client:checkout_object(Version1, Ref)),
Object = dmt_client:checkout_object(Version2, Ref).
-spec update(term()) -> term().
@ -181,11 +169,14 @@ update(_C) ->
Object1 = fixture_domain_object(ID, <<"UpdateFixture1">>),
Object2 = fixture_domain_object(ID, <<"UpdateFixture2">>),
Ref = fixture_object_ref(ID),
#'Snapshot'{version = Version0} = dmt_client:checkout(latest),
Version1 = dmt_client:commit(Version0, #'Commit'{ops = [{insert, #'InsertOp'{object = Object1}}]}),
#domain_conf_Snapshot{version = Version0} = dmt_client:checkout(latest),
Version1 = dmt_client:commit(
Version0,
#domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object1}}]}
),
Version2 = dmt_client:commit(
Version1,
#'Commit'{ops = [{update, #'UpdateOp'{old_object = Object1, new_object = Object2}}]}
#domain_conf_Commit{ops = [{update, #domain_conf_UpdateOp{old_object = Object1, new_object = Object2}}]}
),
_ = dmt_client_cache:update(),
Object1 = dmt_client:checkout_object(Version1, Ref),
@ -196,11 +187,17 @@ delete(_C) ->
ID = next_id(),
Object = fixture_domain_object(ID, <<"DeleteFixture">>),
Ref = fixture_object_ref(ID),
#'Snapshot'{version = Version0} = dmt_client:checkout(latest),
Version1 = dmt_client:commit(Version0, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
Version2 = dmt_client:commit(Version1, #'Commit'{ops = [{remove, #'RemoveOp'{object = Object}}]}),
#domain_conf_Snapshot{version = Version0} = dmt_client:checkout(latest),
Version1 = dmt_client:commit(
Version0,
#domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object}}]}
),
Version2 = dmt_client:commit(
Version1,
#domain_conf_Commit{ops = [{remove, #domain_conf_RemoveOp{object = Object}}]}
),
Object = dmt_client:checkout_object(Version1, Ref),
#'ObjectNotFound'{} = (catch dmt_client:checkout_object(Version2, Ref)).
#domain_conf_ObjectNotFound{} = (catch dmt_client:checkout_object(Version2, Ref)).
-spec pull_commit(term()) -> term().
pull_commit(_C) ->
@ -208,27 +205,27 @@ pull_commit(_C) ->
History1 = #{} = dmt_client:pull_range(0, ?DEFAULT_LIMIT),
Version1 = lists:max([0 | maps:keys(History1)]),
Object = fixture_domain_object(ID, <<"PullFixture">>),
Commit = #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]},
Commit = #domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object}}]},
Version2 = dmt_client:commit(Version1, Commit),
#{Version2 := Commit} = dmt_client:pull_range(Version1, ?DEFAULT_LIMIT).
-spec retry_commit(term()) -> term().
retry_commit(_C) ->
Commit1 = #'Commit'{
Commit1 = #domain_conf_Commit{
ops = [
{insert, #'InsertOp'{
{insert, #domain_conf_InsertOp{
object = fixture_domain_object(next_id(), <<"RetryCommitFixture">>)
}}
]
},
#'Snapshot'{version = Version1} = dmt_client:checkout(latest),
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client:commit(Version1, Commit1),
Version2 = Version1 + 1,
Version2 = dmt_client:commit(Version1, Commit1),
#'Snapshot'{version = Version2} = dmt_client:checkout(latest),
Commit2 = #'Commit'{
#domain_conf_Snapshot{version = Version2} = dmt_client:checkout(latest),
Commit2 = #domain_conf_Commit{
ops = [
{insert, #'InsertOp'{
{insert, #domain_conf_InsertOp{
object = fixture_domain_object(next_id(), <<"RetryCommitFixture">>)
}}
]
@ -236,42 +233,61 @@ retry_commit(_C) ->
Version3 = dmt_client:commit(Version2, Commit2),
Version3 = Version2 + 1,
Version2 = dmt_client:commit(Version1, Commit1),
#'Snapshot'{version = Version3} = dmt_client:checkout(latest).
#domain_conf_Snapshot{version = Version3} = dmt_client:checkout(latest).
-spec migration_success(term()) -> term().
migration_success(_C) ->
#'Snapshot'{version = VersionV3} = dmt_client:checkout(latest),
true = VersionV3 > 0,
VersionV4 = wait_for_migration(VersionV3, 20, 1000),
VersionV4 = VersionV3 + 1.
wait_for_migration(V, TriesLeft, SleepInterval) when TriesLeft > 0 ->
ID = next_id(),
Object = fixture_domain_object(ID, <<"MigrationCommitFixture">>),
Commit = #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]},
try
dmt_client:commit(V, Commit)
catch
_Class:_Reason ->
timer:sleep(SleepInterval),
wait_for_migration(V, TriesLeft - 1, SleepInterval)
end;
wait_for_migration(_, _, _) ->
error(wait_for_migration_failed).
-spec conflict(term()) -> term().
conflict(_C) ->
#'Snapshot'{version = Version1} = dmt_client:checkout(latest),
-spec missing_version(term()) -> term().
missing_version(_C) ->
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
_ = ?assertThrow(
#'OperationConflict'{
#domain_conf_VersionNotFound{},
dmt_client:commit(
Version1 + 42,
#domain_conf_Commit{
ops = [
{insert, #domain_conf_InsertOp{
object = fixture_domain_object(next_id(), <<"MissingVersionFixture">>)
}}
]
}
)
).
-spec obsolete(term()) -> term().
obsolete(_C) ->
Commit1 = #domain_conf_Commit{
ops = [
{insert, #domain_conf_InsertOp{
object = fixture_domain_object(next_id(), <<"InitialFixture">>)
}}
]
},
Commit2 = #domain_conf_Commit{
ops = [
{insert, #domain_conf_InsertOp{
object = fixture_domain_object(next_id(), <<"ObsoleteFixture">>)
}}
]
},
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
_Version2 = dmt_client:commit(Version1, Commit1),
_ = ?assertThrow(
#domain_conf_ObsoleteCommitVersion{},
dmt_client:commit(Version1, Commit2)
).
-spec conflict_notfound(term()) -> term().
conflict_notfound(_C) ->
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
_ = ?assertThrow(
#domain_conf_OperationConflict{
conflict =
{object_not_found, #'ObjectNotFoundConflict'{
{object_not_found, #domain_conf_ObjectNotFoundConflict{
object_ref = {criterion, #domain_CriterionRef{id = 42}}
}}
},
dmt_client:commit(Version1, #'Commit'{
dmt_client:commit(Version1, #domain_conf_Commit{
ops = [
{update, #'UpdateOp'{
{update, #domain_conf_UpdateOp{
old_object = criterion_w_refs(42, []),
new_object = criterion_w_refs(42, [43, 44, 45])
}}
@ -279,13 +295,61 @@ conflict(_C) ->
})
).
-spec conflict_exists(term()) -> term().
conflict_exists(_C) ->
ID = next_id(),
Ref = fixture_object_ref(ID),
Commit = #domain_conf_Commit{
ops = [
{insert, #domain_conf_InsertOp{
object = fixture_domain_object(ID, <<"ExistingObjectFixture">>)
}}
]
},
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client:commit(Version1, Commit),
_ = ?assertThrow(
#domain_conf_OperationConflict{
conflict = {object_already_exists, #domain_conf_ObjectAlreadyExistsConflict{object_ref = Ref}}
},
dmt_client:commit(Version2, Commit)
).
-spec conflict_mismatch(term()) -> term().
conflict_mismatch(_C) ->
ID1 = next_id(),
ID2 = next_id(),
Object1 = fixture_domain_object(ID1, <<"Original">>),
Ref2 = fixture_object_ref(ID2),
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client:commit(
Version1,
#domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object1}}]}
),
_ = ?assertThrow(
#domain_conf_OperationConflict{
conflict = {object_reference_mismatch, #domain_conf_ObjectReferenceMismatchConflict{object_ref = Ref2}}
},
dmt_client:commit(
Version2,
#domain_conf_Commit{
ops = [
{update, #domain_conf_UpdateOp{
old_object = Object1,
new_object = fixture_domain_object(ID2, <<"Mismatch">>)
}}
]
}
)
).
-spec nonexistent(term()) -> term().
nonexistent(_C) ->
#'Snapshot'{version = Version1} = dmt_client:checkout(latest),
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
_ = ?assertThrow(
#'OperationInvalid'{
#domain_conf_OperationInvalid{
errors = [
{object_not_exists, #'NonexistantObject'{
{object_not_exists, #domain_conf_NonexistantObject{
object_ref = {criterion, #domain_CriterionRef{}},
referenced_by = [{criterion, #domain_CriterionRef{id = 42}}]
}}
@ -297,18 +361,18 @@ nonexistent(_C) ->
-spec reference_cycles(term()) -> term().
reference_cycles(_C) ->
#'Snapshot'{version = Version1} = dmt_client:checkout(latest),
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
_ = ?assertThrow(
#'OperationInvalid'{
#domain_conf_OperationInvalid{
errors = [
%% we expect 3 cycles to be found
{object_reference_cycle, #'ObjectReferenceCycle'{
{object_reference_cycle, #domain_conf_ObjectReferenceCycle{
cycle = [{criterion, #domain_CriterionRef{}} | _]
}},
{object_reference_cycle, #'ObjectReferenceCycle'{
{object_reference_cycle, #domain_conf_ObjectReferenceCycle{
cycle = [{criterion, #domain_CriterionRef{}} | _]
}},
{object_reference_cycle, #'ObjectReferenceCycle'{
{object_reference_cycle, #domain_conf_ObjectReferenceCycle{
cycle = [{criterion, #domain_CriterionRef{}} | _]
}}
]
@ -324,8 +388,37 @@ reference_cycles(_C) ->
)
).
-spec checkout_object(term()) -> term().
checkout_object(_C) ->
ID = next_id(),
Object = fixture_domain_object(ID, <<"InsertFixture">>),
Ref = fixture_object_ref(ID),
#domain_conf_Snapshot{version = Version1} = dmt_client:checkout(latest),
Version2 = dmt_client:commit(
Version1,
#domain_conf_Commit{ops = [{insert, #domain_conf_InsertOp{object = Object}}]}
),
?assertEqual(
{ok, #domain_conf_VersionedObject{version = Version2, object = Object}},
call_checkout_object({head, #domain_conf_Head{}}, Ref)
),
?assertEqual(
{exception, #domain_conf_ObjectNotFound{}},
call_checkout_object({version, Version1}, Ref)
),
?assertEqual(
{ok, #domain_conf_VersionedObject{version = Version2, object = Object}},
call_checkout_object({version, Version2}, Ref)
),
?assertEqual(
{exception, #domain_conf_VersionNotFound{}},
call_checkout_object({version, Version2 + 1}, Ref)
).
next_id() ->
erlang:system_time(micro_seconds) band 16#7FFFFFFF.
16#7FFFFFFF band
(erlang:system_time(millisecond) * 1000 +
erlang:unique_integer([positive, monotonic])).
fixture_domain_object(Ref, Data) ->
{category, #domain_CategoryObject{
@ -344,3 +437,17 @@ criterion_w_refs(ID, Refs) ->
predicate = {any_of, ordsets:from_list([{criterion, #domain_CriterionRef{id = Ref}} || Ref <- Refs])}
}
}}.
%%
call_checkout_object(Version, ObjectReference) ->
call('RepositoryClient', 'checkoutObject', {Version, ObjectReference}).
call(ServiceName, Function, Args) ->
Url = <<"http://dominant:8022/v1/domain/repository_client">>,
Call = {{dmsl_domain_conf_thrift, ServiceName}, Function, Args},
CallOpts = #{
url => Url,
event_handler => [scoper_woody_event_handler]
},
woody_client:call(Call, CallOpts, woody_context:new()).