HG-155: Switch to the msgpack-aware machinegun interface (#32)

* HG-155: Switch to msgpack-like machinegun interface

* HG-155: Bump deps up and move onto Erlang 19

* HG-155: Make the release rebar-runnable

* HG-155: Fix marshalling to and from msgpack

* HG-155: Bump to rbkmoney/woody_erlang@2f264f9

* HG-155: Add forgotten msgpack-thrift protocol

* HG-155: Make dialyzer happy again

* HG-155: Make repository machine versioned

 * Migrate legacy history upon machine init
 * Introduce automaton client to reduce boilerplate
 * Remove `get_commit` facility as it was unused

* HG-155: Add repository dispatcher + add migration tests

* HG-155: Move in-memory thrift encoding into dedicated module

* HG-155: Use plain thrift binary as it is much simpler to work with

* HG-155: Switch back to msgpack-based commit representaion

* HG-155: Remove meaningless repair signal handling in favor of simple repair

* HG-155: Add forgotten behaviour annotations

* HG-155: Mention better approach with a migrating repository backend

* HG-155: Make dialyzer happy again

* HG-155: Bump to rbkmoney/damsel@d0c4a06, rbkmoney/machinegun@e04e529

* HG-155: Make elvis happy

* HG-155: Fix make target and Jenkinsfile
This commit is contained in:
Andrew Mayorov 2017-03-29 18:27:11 +03:00 committed by GitHub
parent 6d5a843270
commit 61320c4320
18 changed files with 998 additions and 284 deletions

2
Jenkinsfile vendored
View File

@ -36,7 +36,7 @@ build('dominant', 'docker-host', finalHook) {
sh 'make wc_xref'
}
runStage('dialyze') {
withWsCache("_build/default/rebar3_18.3_plt") {
withWsCache("_build/default/rebar3_19.1_plt") {
sh 'make wc_dialyze'
}
}

View File

@ -19,7 +19,7 @@ BASE_IMAGE_TAG := 13454a94990acb72f753623ec13599a9f6f4f852
## Variables required for utils_container.mk
# Build image tag to be used
BUILD_IMAGE_TAG := 7f6c3f231c0cffbf11e67f5a5e38366bef1c798f
BUILD_IMAGE_TAG := 3750c129119b83ea399dc4aa0ed923fb0e3bf0f0
BASE_IMAGE := "$(ORG_NAME)/build:latest"
RELNAME := dominant
@ -54,7 +54,7 @@ compile: submodules rebar-update
xref: submodules
$(REBAR) xref
lint: compile
lint:
elvis rock
dialyze:
@ -66,7 +66,7 @@ start: submodules
devrel: submodules
$(REBAR) release
release: distclean
release: submodules distclean
$(REBAR) as prod release
clean:
@ -74,7 +74,7 @@ clean:
distclean:
$(REBAR) clean -a
rm -rfv _build _builds _cache _steps _temp
rm -rfv _build
# CALL_W_CONTAINER
test: submodules

View File

@ -14,7 +14,7 @@ services:
- machinegun
machinegun:
image: dr.rbkmoney.com/rbkmoney/machinegun:2c956c1172cf8f7b4a09512cd1571bdd4c57f1c1
image: dr.rbkmoney.com/rbkmoney/machinegun:e04e529f4c5682b527d12d73a13a3cf9eb296d4d
volumes:
- ./test/machinegun/sys.config:/opt/machinegun/releases/0.1.0/sys.config
environment:

View File

@ -9,7 +9,6 @@
{elvis_style, line_length, #{limit => 120, skip_comments => false}},
{elvis_style, no_tabs},
{elvis_style, no_trailing_whitespace},
{elvis_style, macro_names},
{elvis_style, macro_module_names},
{elvis_style, operator_spaces, #{rules => [{right, ","}, {right, "++"}, {left, "++"}]}},
{elvis_style, nesting_level, #{level => 3}},

View File

@ -32,7 +32,7 @@
{woody , {git, "git@github.com:rbkmoney/woody_erlang.git", {branch, "master"}}},
{dmsl , {git, "git@github.com:rbkmoney/damsel_erlang.git", {branch, "master"}}},
{dmt , {git, "git@github.com:rbkmoney/dmt_core.git", {branch, "master"}}},
{lager , "3.0.2"},
{lager , "3.2.1"},
{lager_logstash_formatter, {git, "git@github.com:rbkmoney/lager_logstash_formatter.git", {branch, "master"}}},
% TODO move to the test profile as soon as compose quirks get fixed
@ -57,8 +57,7 @@
%% Relx configuration
{relx, [
{release, {dominant, "0.1"}, [
dmt_api,
sasl
dmt_api
]},
{sys_config, "./config/sys.config"},
{vm_args, "./config/vm.args"},
@ -87,7 +86,18 @@
]},
{prod, [
{deps, [
% for introspection on production
{recon, "2.3.2"}
]},
{relx, [
{release, {dominant, "0.1"}, [
{recon , load }, % tools for introspection
{runtime_tools, load }, % debugger
{tools , load }, % profiler
sasl,
dmt_api
]},
{dev_mode, false},
{include_erts, true}
]}

View File

@ -1,9 +1,10 @@
{"1.1.0",
[{<<"certifi">>,{pkg,<<"certifi">>,<<"0.7.0">>},2},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"1.0.4">>},1},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.2">>},2},
{<<"dmsl">>,
{git,"git@github.com:rbkmoney/damsel_erlang.git",
{ref,"c4eb97b153bcb99c9d728f86ded250c32700fc9e"}},
{ref,"52018a4684f48bae501860113790d4f535c4213c"}},
0},
{<<"dmt">>,
{git,"git@github.com:rbkmoney/dmt_core.git",
@ -17,29 +18,45 @@
{git,"https://github.com/rbkmoney/genlib.git",
{ref,"82ff16f4314fc406dd90752467a08fe401b009ef"}},
0},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.7">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.8">>},1},
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.6.2">>},1},
{<<"idna">>,{pkg,<<"idna">>,<<"1.2.0">>},2},
{<<"jsx">>,{pkg,<<"jsx">>,<<"2.8.0">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.0.2">>},0},
{<<"jsx">>,{pkg,<<"jsx">>,<<"2.8.2">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.2.1">>},0},
{<<"lager_logstash_formatter">>,
{git,"git@github.com:rbkmoney/lager_logstash_formatter.git",
{ref,"562ec7a42020e2fed4bd80fee9bb585eb634befb"}},
{ref,"d7370337d4d55b37915a2c3202f5c39047674bb3"}},
0},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.2">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.2.1">>},2},
{<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.2.0">>},1},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.3.2">>},2},
{<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.9.0">>},1},
{<<"snowflake">>,
{git,"https://github.com/rbkmoney/snowflake.git",
{ref,"36b978a3ad711c9d9349b799a24c5499a95ae29a"}},
{ref,"0a598108f6582affe3b4ae550fc5b9f2062e318a"}},
1},
{<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.1">>},2},
{<<"thrift">>,
{git,"https://github.com/rbkmoney/thrift_erlang.git",
{ref,"aca7fca9f1a7161a1324bf5b92f8402c90d0519e"}},
{ref,"240bbc842f6e9b90d01bd07838778cf48752b510"}},
1},
{<<"woody">>,
{git,"git@github.com:rbkmoney/woody_erlang.git",
{ref,"b5ae9ae3abc8da470f68d2e2ca15ccc606801225"}},
0}].
{ref,"2f264f97c6434806ac6a6c8405824fdf00432dbe"}},
0}]}.
[
{pkg_hash,[
{<<"certifi">>, <<"861A57F3808F7EB0C2D1802AFEAAE0FA5DE813B0DF0979153CBAFCD853ABABAF">>},
{<<"cowboy">>, <<"A324A8DF9F2316C833A470D918AAF73AE894278B8AA6226CE7A9BF699388F878">>},
{<<"cowlib">>, <<"9D769A1D062C9C3AC753096F868CA121E2730B9A377DE23DEC0F7E08B1DF84EE">>},
{<<"goldrush">>, <<"2024BA375CEEA47E27EA70E14D2C483B2D8610101B4E852EF7F89163CDB6E649">>},
{<<"hackney">>, <<"96A0A5E7E65B7ACAD8031D231965718CC70A9B4131A8B033B7543BBD673B8210">>},
{<<"idna">>, <<"AC62EE99DA068F43C50DC69ACF700E03A62A348360126260E87F2B54ECED86B2">>},
{<<"jsx">>, <<"7ACC7D785B5ABE8A6E9ADBDE926A24E481F29956DD8B4DF49E3E4E7BCC92A018">>},
{<<"lager">>, <<"EEF4E18B39E4195D37606D9088EA05BF1B745986CF8EC84F01D332456FE88D17">>},
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
{<<"mimerl">>, <<"993F9B0E084083405ED8252B99460C4F0563E41729AB42D9074FD5E52439BE88">>},
{<<"ranch">>, <<"E4965A144DC9FBE70E5C077C65E73C57165416A901BD02EA899CFD95AA890986">>},
{<<"rfc3339">>, <<"2075653DC9407541C84B1E15F8BDA2ABE95FB17C9694025E079583F2D19C1060">>},
{<<"ssl_verify_fun">>, <<"28A4D65B7F59893BC2C7DE786DEC1E1555BD742D336043FE644AE956C3497FBE">>}]}
].

View File

@ -3,10 +3,10 @@
-behaviour(supervisor).
%% API
-export([checkout/2]).
-export([checkout_object/3]).
-export([pull/2]).
-export([commit/3]).
-export([checkout/3]).
-export([checkout_object/4]).
-export([pull/3]).
-export([commit/4]).
-export([apply_commit/3]).
%% behaviours
@ -19,29 +19,32 @@
%% API
-type context() :: woody_client:context().
-type repository() :: module().
-spec checkout(dmt:ref(), context()) -> dmt:snapshot() | {error, version_not_found}.
checkout(Reference, Context) ->
-spec checkout(dmt:ref(), repository(), context()) ->
{ok, dmt:snapshot()} | {error, version_not_found}.
checkout(Reference, Repository, Context) ->
try
dmt_cache:checkout(Reference)
{ok, dmt_cache:checkout(Reference)}
catch
version_not_found ->
case try_get_snapshot(Reference, Context) of
Snapshot = #'Snapshot'{} ->
dmt_cache:cache_snapshot(Snapshot);
case try_get_snapshot(Reference, Repository, Context) of
{ok, Snapshot} ->
{ok, dmt_cache:cache_snapshot(Snapshot)};
{error, version_not_found} ->
{error, version_not_found}
end
end.
-spec try_get_snapshot(dmt:ref(), context()) -> dmt:snapshot() | {error, version_not_found}.
try_get_snapshot(Reference, Context) ->
History = dmt_api_mg:get_history(undefined, reference_to_limit(Reference), Context),
-spec try_get_snapshot(dmt:ref(), repository(), context()) ->
{ok, dmt:snapshot()} | {error, version_not_found}.
try_get_snapshot(Reference, Repository, Context) ->
History = dmt_api_repository:get_history(Repository, reference_to_limit(Reference), Context),
case {Reference, dmt_history:head(History)} of
{{head, #'Head'{}}, Snapshot} ->
Snapshot;
{ok, Snapshot};
{{version, V}, Snapshot = #'Snapshot'{version = V}} ->
Snapshot;
{ok, Snapshot};
{{version, V1}, #'Snapshot'{version = V2}} when V1 > V2 ->
{error, version_not_found}
end.
@ -52,39 +55,38 @@ reference_to_limit({head, #'Head'{}}) ->
reference_to_limit({version, Version}) ->
Version.
-spec checkout_object(dmt:ref(), dmt:object_ref(), context()) ->
dmsl_domain_config_thrift:'VersionedObject'() | {error, version_not_found | object_not_found}.
checkout_object(Reference, ObjectReference, Context) ->
Snapshot = checkout(Reference, Context),
case Snapshot of
#'Snapshot'{} ->
-spec checkout_object(dmt:ref(), dmt:object_ref(), repository(), context()) ->
{ok, dmsl_domain_config_thrift:'VersionedObject'()} | {error, version_not_found | object_not_found}.
checkout_object(Reference, ObjectReference, Repository, Context) ->
case checkout(Reference, Repository, Context) of
{ok, Snapshot} ->
try_get_object(ObjectReference, Snapshot);
{error, _} ->
Snapshot
{error, _} = Error ->
Error
end.
try_get_object(ObjectReference, #'Snapshot'{version = Version, domain = Domain}) ->
case dmt_domain:get_object(ObjectReference, Domain) of
{ok, Object} ->
#'VersionedObject'{version = Version, object = Object};
{ok, #'VersionedObject'{version = Version, object = Object}};
error ->
{error, object_not_found}
end.
-spec pull(dmt:version(), context()) -> dmt:history() | {error, version_not_found}.
pull(Version, Context) ->
dmt_api_mg:get_history(Version, undefined, Context).
-spec pull(dmt:version(), repository(), context()) ->
{ok, dmt:history()} | {error, version_not_found}.
pull(Version, Repository, Context) ->
dmt_api_repository:get_history_since(Repository, Version, Context).
-spec commit(dmt:version(), dmt:commit(), context()) ->
dmt:version() | {error, version_not_found | operation_conflict}.
commit(Version, Commit, Context) ->
Snapshot = dmt_api_mg:commit(Version, Commit, Context),
case Snapshot of
#'Snapshot'{version = VersionNext} ->
-spec commit(dmt:version(), dmt:commit(), repository(), context()) ->
{ok, dmt:version()} | {error, version_not_found | operation_conflict}.
commit(Version, Commit, Repository, Context) ->
case dmt_api_repository:commit(Repository, Version, Commit, Context) of
{ok, Snapshot = #'Snapshot'{version = VersionNext}} ->
_ = dmt_cache:cache_snapshot(Snapshot),
VersionNext;
{error, _} ->
Snapshot
{ok, VersionNext};
{error, _} = Error ->
Error
end.
-spec apply_commit(dmt:version(), dmt:commit(), dmt:history()) ->
@ -124,11 +126,10 @@ init([]) ->
API = woody_server:child_spec(
?MODULE,
#{
ip => IP,
port => genlib_app:env(?MODULE, port, 8022),
net_opts => #{},
ip => IP,
port => genlib_app:env(?MODULE, port, 8022),
event_handler => woody_event_handler_default,
handlers => [
handlers => [
get_handler_spec(repository),
get_handler_spec(repository_client),
get_handler_spec(state_processor)
@ -138,22 +139,25 @@ init([]) ->
Children = [API],
{ok, {#{strategy => one_for_one, intensity => 10, period => 60}, Children}}.
-spec get_handler_spec(Which) -> {Path, {woody:service(), module()}} when
-spec get_handler_spec(Which) -> {Path, {woody:service(), woody:handler(module())}} when
Which :: repository | repository_client | state_processor,
Path :: iodata().
get_handler_spec(repository) ->
{"/v1/domain/repository", {
{dmsl_domain_config_thrift, 'Repository'},
dmt_api_repository_handler
{dmt_api_repository_handler, get_repository_mod()}
}};
get_handler_spec(repository_client) ->
{"/v1/domain/repository_client", {
{dmsl_domain_config_thrift, 'RepositoryClient'},
dmt_api_repository_client_handler
{dmt_api_repository_client_handler, get_repository_mod()}
}};
get_handler_spec(state_processor) ->
{"/v1/stateproc", {
{dmsl_state_processing_thrift, 'Processor'},
dmt_api_state_processor
get_repository_mod()
}}.
get_repository_mod() ->
genlib_app:env(?MODULE, repository, dmt_api_repository_v2).

View File

@ -0,0 +1,84 @@
-module(dmt_api_automaton_client).
-include_lib("dmsl/include/dmsl_state_processing_thrift.hrl").
-export([call/4]).
-export([get_history/4]).
-export([start/3]).
%%
-type ns() :: dmsl_base_thrift:'Namespace'().
-type id() :: dmsl_base_thrift:'ID'().
-type args() :: dmsl_state_processing_thrift:'Args'().
-type response() :: dmsl_state_processing_thrift:'CallResponse'().
-type descriptor() :: dmsl_state_processing_thrift:'MachineDescriptor'().
-type history_range() :: dmsl_state_processing_thrift:'HistoryRange'().
-type history() :: dmsl_state_processing_thrift:'History'().
-type context() :: woody_context:ctx().
%%
-spec call(ns(), id(), args(), context()) ->
response() |
no_return().
call(NS, ID, Args, Context) ->
Descriptor = construct_descriptor(NS, ID, #'HistoryRange'{}),
case issue_rpc('Call', [Descriptor, Args], Context) of
{ok, Result} ->
Result;
{error, #'MachineNotFound'{}} ->
ok = start(NS, ID, Context),
call(NS, ID, Args, Context)
end.
-spec get_history(ns(), id(), history_range(), context()) ->
{ok, history()} |
{error,
dmsl_state_processing_thrift:'EventNotFound'() |
dmsl_state_processing_thrift:'MachineNotFound'()
} |
no_return().
get_history(NS, ID, HistoryRange, Context) ->
Descriptor = construct_descriptor(NS, ID, HistoryRange),
case issue_rpc('GetMachine', [Descriptor], Context) of
{ok, #'Machine'{history = History}} ->
{ok, History};
{error, _} = Error ->
Error
end.
-spec start(ns(), id(), context()) ->
ok | no_return().
start(NS, ID, Context) ->
case issue_rpc('Start', [NS, ID, {nl, #msgpack_Nil{}}], Context) of
{ok, _} ->
ok;
{error, #'MachineAlreadyExists'{}} ->
ok
end.
-spec construct_descriptor(ns(), id(), history_range()) ->
descriptor().
construct_descriptor(NS, ID, HistoryRange) ->
#'MachineDescriptor'{
ns = NS,
ref = {id, ID},
range = HistoryRange
}.
-spec issue_rpc(atom(), list(term()), context()) ->
term() | no_return().
issue_rpc(Method, Args, Context) ->
Request = {{dmsl_state_processing_thrift, 'Automaton'}, Method, Args},
{ok, URL} = application:get_env(dmt_api, automaton_service_url),
Opts = #{url => URL, event_handler => {woody_event_handler_default, undefined}},
case woody_client:call(Request, Opts, Context) of
{ok, _} = Ok ->
Ok;
{exception, #'NamespaceNotFound'{}} ->
error(namespace_not_found);
{exception, #'MachineFailed'{}} ->
error(machine_failed);
{exception, Exception} ->
{error, Exception}
end.

View File

@ -1,124 +0,0 @@
-module(dmt_api_mg).
-export([start/1]).
-export([get_commit/2]).
-export([get_history/1]).
-export([get_history/3]).
-export([commit/3]).
-export([read_history/1]).
-export([call/3]).
-include_lib("dmsl/include/dmsl_state_processing_thrift.hrl").
-include_lib("dmsl/include/dmsl_base_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary">>).
-define(REF , {id, ?ID}).
-type context() :: woody_client:context().
-type ref() :: dmsl_state_processing_thrift:'Reference'().
-type ns() :: dmsl_base_thrift:'Namespace'().
-type history_range() :: dmsl_state_processing_thrift:'HistoryRange'().
-type descriptor() :: dmsl_state_processing_thrift:'MachineDescriptor'().
%%
-spec start(context()) ->
ok | no_return().
start(Context) ->
try call('Start', [?NS, ?ID, <<>>], Context) catch
error:#'MachineAlreadyExists'{} ->
ok
end.
-spec get_commit(dmt:version(), context()) ->
dmt:commit() | {error, version_not_found} | no_return().
get_commit(ID, Context) ->
case get_history(get_prev_commit(ID), 1, Context) of
#{ID := Commit} ->
Commit;
#{} ->
{error, version_not_found};
Error ->
Error
end.
get_prev_commit(1) ->
undefined;
get_prev_commit(N) ->
N - 1.
%% TODO: add range requests after they are fixed in mg
-spec get_history(context()) ->
dmt:history().
get_history(Context) ->
get_history(undefined, undefined, Context).
%% TODO: change this interface to accept dmt:version only
-spec get_history(dmt:version() | undefined, pos_integer() | undefined, context()) ->
dmt:history() | {error, version_not_found}.
get_history(After, Limit, Context) ->
Range = #'HistoryRange'{'after' = prepare_event_id(After), 'limit' = Limit},
Descriptor = prepare_descriptor(?NS, ?REF, Range),
try read_history(call('GetMachine', [Descriptor], Context)) catch
error:#'EventNotFound'{} ->
{error, version_not_found}
end.
-spec commit(dmt:version(), dmt:commit(), context()) ->
dmt:snapshot() | {error, version_not_found | operation_conflict}.
commit(Version, Commit, Context) ->
Descriptor = prepare_descriptor(?NS, ?REF, #'HistoryRange'{}),
Call = term_to_binary({commit, Version, Commit}),
binary_to_term(call('Call', [Descriptor, Call], Context)).
%%
-spec call(atom(), list(term()), context()) -> term() | no_return().
call(Method, Args, Context) ->
Request = {{dmsl_state_processing_thrift, 'Automaton'}, Method, Args},
{ok, URL} = application:get_env(dmt_api, automaton_service_url),
Opts = #{url => URL, event_handler => {woody_event_handler_default, undefined}},
case woody_client:call(Request, Opts, Context) of
{ok, Result} ->
Result;
{exception, #'MachineNotFound'{}} ->
ok = start(Context),
case woody_client:call(Request, Opts, Context) of
{ok, Result} ->
Result;
{exception, Exception} ->
error(Exception)
end;
{exception, Exception} ->
error(Exception)
end.
%% utils
-spec read_history(dmsl_state_processing_thrift:'Machine'()) -> dmt:history().
read_history(#'Machine'{history = Events}) ->
read_history(Events, #{}).
-spec read_history([dmsl_state_processing_thrift:'Event'()], dmt:history()) ->
dmt:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = BinaryPayload} | Rest], History) ->
read_history(Rest, History#{Id => binary_to_term(BinaryPayload)}).
prepare_event_id(ID) when is_integer(ID) andalso ID > 0 ->
ID;
prepare_event_id(_) ->
undefined.
-spec prepare_descriptor(ns(), ref(), history_range()) -> descriptor().
prepare_descriptor(NS, Ref, Range) ->
#'MachineDescriptor'{
ns = NS,
ref = Ref,
range = Range
}.

View File

@ -0,0 +1,31 @@
-module(dmt_api_repository).
%% API
-export([get_history/3]).
-export([get_history_since/3]).
-export([commit/4]).
-type context() :: woody_context:ctx().
-callback get_history(pos_integer() | undefined, context()) ->
dmt:history().
-callback get_history_since(dmt:version(), context()) ->
{ok, dmt:history()} | {error, version_not_found}.
-callback commit(dmt:version(), dmt:commit(), context()) ->
{ok, dmt:snapshot()} | {error, version_not_found | operation_conflict}.
-spec get_history(module(), pos_integer() | undefined, context()) ->
dmt:history().
get_history(Mod, Limit, Context) ->
Mod:get_history(Limit, Context).
-spec get_history_since(module(), dmt:version(), context()) ->
{ok, dmt:history()} | {error, version_not_found}.
get_history_since(Mod, Version, Context) ->
Mod:get_history_since(Version, Context).
-spec commit(module(), dmt:version(), dmt:commit(), context()) ->
{ok, dmt:snapshot()} | {error, version_not_found | operation_conflict}.
commit(Mod, Version, Commit, Context) ->
Mod:commit(Version, Commit, Context).

View File

@ -7,15 +7,14 @@
-include_lib("dmsl/include/dmsl_domain_config_thrift.hrl").
-spec handle_function(
woody:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {ok, woody_server_thrift_handler:result()} | no_return().
handle_function('checkoutObject', [Reference, ObjectReference], Context, _Opts) ->
case dmt_api:checkout_object(Reference, ObjectReference, Context) of
Object = #'VersionedObject'{} ->
-type context() :: woody_context:ctx().
-spec handle_function
('checkoutObject', woody:args(), context(), woody:options()) ->
{ok, dmsl_domain_config_thrift:'VersionedObject'()} | no_return().
handle_function('checkoutObject', [Reference, ObjectReference], Context, Repository) ->
case dmt_api:checkout_object(Reference, ObjectReference, Repository, Context) of
{ok, Object} ->
{ok, Object};
{error, object_not_found} ->
woody_error:raise(business, #'ObjectNotFound'{});

View File

@ -7,31 +7,34 @@
-include_lib("dmsl/include/dmsl_domain_config_thrift.hrl").
-spec handle_function(
woody:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {ok, woody_server_thrift_handler:result()} | no_return().
handle_function('Commit', [Version, Commit], Context, _Opts) ->
case dmt_api:commit(Version, Commit, Context) of
VersionNext when is_integer(VersionNext) ->
-type context() :: woody_context:ctx().
-spec handle_function
('Commit', woody:args(), context(), woody:options()) ->
{ok, dmt:version()} | no_return();
('Checkout', woody:args(), context(), woody:options()) ->
{ok, dmt:snapshot()} | no_return();
('Pull', woody:args(), context(), woody:options()) ->
{ok, dmt:history()} | no_return().
handle_function('Commit', [Version, Commit], Context, Repository) ->
case dmt_api:commit(Version, Commit, Repository, Context) of
{ok, VersionNext} ->
{ok, VersionNext};
{error, operation_conflict} ->
woody_error:raise(business, #'OperationConflict'{});
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})
end;
handle_function('Checkout', [Reference], Context, _Opts) ->
case dmt_api:checkout(Reference, Context) of
Snapshot = #'Snapshot'{} ->
handle_function('Checkout', [Reference], Context, Repository) ->
case dmt_api:checkout(Reference, Repository, Context) of
{ok, Snapshot} ->
{ok, Snapshot};
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})
end;
handle_function('Pull', [Version], Context, _Opts) ->
case dmt_api:pull(Version, Context) of
History = #{} ->
handle_function('Pull', [Version], Context, Repository) ->
case dmt_api:pull(Version, Repository, Context) of
{ok, History} ->
{ok, History};
{error, version_not_found} ->
woody_error:raise(business, #'VersionNotFound'{})

View File

@ -0,0 +1,152 @@
-module(dmt_api_repository_v1).
-behaviour(dmt_api_repository).
-include_lib("dmsl/include/dmsl_state_processing_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary">>).
%% API
-export([get_history/2]).
-export([get_history_since/2]).
-export([commit/3]).
%% State processor
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-type context() :: woody_context:ctx().
-spec get_history(pos_integer() | undefined, context()) ->
dmt:history().
get_history(Limit, Context) ->
get_history_by_range(#'HistoryRange'{'after' = undefined, 'limit' = Limit}, Context).
-spec get_history_since(dmt:version(), context()) ->
{ok, dmt:history()} | {error, version_not_found}.
get_history_since(Version, Context) ->
case get_history_by_range(#'HistoryRange'{'after' = Version, 'limit' = undefined}, Context) of
History when is_map(History) ->
{ok, History};
Error ->
Error
end.
%%
-type history_range() :: dmsl_state_processing_thrift:'HistoryRange'().
-type machine() :: dmsl_state_processing_thrift:'Machine'().
-type history() :: dmsl_state_processing_thrift:'History'().
-spec get_history_by_range(history_range(), context()) ->
dmt:history() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #'MachineNotFound'{}} ->
#{};
{error, #'EventNotFound'{}} ->
{error, version_not_found}
end.
%%
-spec commit(dmt:version(), dmt:commit(), context()) ->
{ok, dmt:snapshot()} | {error, version_not_found | operation_conflict}.
commit(Version, Commit, Context) ->
call({commit, Version, Commit}, Context).
%%
-define(NIL, {nl, #msgpack_Nil{}}).
-type commit_call() :: {commit, dmt:version(), dmt:commit()}.
-type commit_result() :: {ok, dmt:snapshot()} | {error, version_not_found | operation_conflict}.
-spec call(commit_call(), context()) ->
commit_result() | no_return().
call(Call, Context) ->
decode_call_result(dmt_api_automaton_client:call(?NS, ?ID, encode_call(Call), Context)).
%%
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessCall', [#'CallArgs'{arg = Payload, machine = Machine}], _Context, _Opts) ->
Call = decode_call(Payload),
{Result, Events} = handle_call(Call, read_history(Machine)),
Response = encode_call_result(Result),
{ok, construct_call_result(Response, [encode_event(E) || E <- Events])};
handle_function('ProcessSignal', [#'SignalArgs'{signal = {init, #'InitSignal'{}}}], _Context, _Opts) ->
{ok, construct_signal_result([])}.
construct_call_result(Response, Events) ->
#'CallResult'{
response = Response,
change = #'MachineStateChange'{aux_state = ?NIL, events = Events},
action = #'ComplexAction'{}
}.
construct_signal_result(Events) ->
#'SignalResult'{
change = #'MachineStateChange'{aux_state = ?NIL, events = Events},
action = #'ComplexAction'{}
}.
%%
handle_call({commit, Version, Commit}, History) ->
case dmt_api:apply_commit(Version, Commit, History) of
{ok, _} = Ok ->
{Ok, [{commit, Commit}]};
{error, version_not_found} ->
{{error, version_not_found}, []};
{error, Reason} ->
_ = lager:info("commit failed: ~p", [Reason]),
{{error, operation_conflict}, []}
end.
%%
-spec read_history(history() | machine()) ->
dmt:history().
read_history(#'Machine'{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #{}).
-spec read_history(dmsl_state_processing_thrift:'History'(), dmt:history()) ->
dmt:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = EventData} | Rest], History) ->
{commit, Commit} = decode_event(EventData),
read_history(Rest, History#{Id => Commit}).
%%
encode_event({commit, Commit}) ->
{bin, term_to_binary(Commit)}.
decode_event({bin, CommitData}) ->
{commit, binary_to_term(CommitData)}.
%%
encode_call(Call) ->
{bin, term_to_binary(Call)}.
decode_call({bin, CallData}) ->
binary_to_term(CallData).
encode_call_result(Result) ->
{bin, term_to_binary(Result)}.
decode_call_result({bin, ResultData}) ->
binary_to_term(ResultData).

View File

@ -0,0 +1,180 @@
-module(dmt_api_repository_v2).
-behaviour(dmt_api_repository).
-include_lib("dmsl/include/dmsl_state_processing_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary/v2">>).
%% API
-export([get_history/2]).
-export([get_history_since/2]).
-export([commit/3]).
%% State processor
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-type context() :: woody_context:ctx().
-spec get_history(pos_integer() | undefined, context()) ->
dmt:history().
get_history(Limit, Context) ->
get_history_by_range(#'HistoryRange'{'after' = undefined, 'limit' = Limit}, Context).
-spec get_history_since(dmt:version(), context()) ->
{ok, dmt:history()} | {error, version_not_found}.
get_history_since(Version, Context) ->
case get_history_by_range(#'HistoryRange'{'after' = Version, 'limit' = undefined}, Context) of
History when is_map(History) ->
{ok, History};
Error ->
Error
end.
%%
-type history_range() :: dmsl_state_processing_thrift:'HistoryRange'().
-type machine() :: dmsl_state_processing_thrift:'Machine'().
-type history() :: dmsl_state_processing_thrift:'History'().
-spec get_history_by_range(history_range(), context()) ->
dmt:history() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #'MachineNotFound'{}} ->
ok = dmt_api_automaton_client:start(?NS, ?ID, Context),
get_history_by_range(HistoryRange, Context);
{error, #'EventNotFound'{}} ->
{error, version_not_found}
end.
%%
-spec commit(dmt:version(), dmt:commit(), context()) ->
{ok, dmt:snapshot()} | {error, version_not_found | operation_conflict}.
commit(Version, Commit, Context) ->
call({commit, Version, Commit}, Context).
%%
-define(NIL, {nl, #msgpack_Nil{}}).
-type commit_call() :: {commit, dmt:version(), dmt:commit()}.
-type commit_result() :: {ok, dmt:snapshot()} | {error, version_not_found | operation_conflict}.
-spec call(commit_call(), context()) ->
commit_result() | no_return().
call(Call, Context) ->
decode_call_result(Call, dmt_api_automaton_client:call(?NS, ?ID, encode_call(Call), Context)).
%%
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) ->
{ok, woody:result()} | no_return().
handle_function('ProcessCall', [#'CallArgs'{arg = Payload, machine = Machine}], _Context, _Opts) ->
Call = decode_call(Payload),
{Result, Events} = handle_call(Call, read_history(Machine)),
{ok, construct_call_result(Call, Result, Events)};
handle_function('ProcessSignal', [#'SignalArgs'{signal = {init, #'InitSignal'{}}}], Context, _Opts) ->
%%% TODO It's generally prettier to make up a _migrating_ repository which is the special repository
%%% module designed to facilitate migrations between some preconfigured 'old' repository backend
%%% and some 'new' one. The migration process could be triggered by the very first mutating
%%% operation (e.g. commit) going into this backend for example.
LegacyHistory = dmt_api_repository_v1:get_history(undefined, Context),
{ok, construct_signal_result(get_events_from_history(LegacyHistory))}.
get_events_from_history(History) ->
[{commit, Commit} || {_Version, Commit} <- lists:keysort(1, maps:to_list(History))].
construct_call_result(Call, Response, Events) ->
#'CallResult'{
response = encode_call_result(Call, Response),
change = #'MachineStateChange'{aux_state = ?NIL, events = encode_events(Events)},
action = #'ComplexAction'{}
}.
construct_signal_result(Events) ->
#'SignalResult'{
change = #'MachineStateChange'{aux_state = ?NIL, events = encode_events(Events)},
action = #'ComplexAction'{}
}.
encode_events(Events) ->
[encode_event(E) || E <- Events].
%%
handle_call({commit, Version, Commit}, History) ->
case dmt_api:apply_commit(Version, Commit, History) of
{ok, _} = Ok ->
{Ok, [{commit, Commit}]};
{error, version_not_found} ->
{{error, version_not_found}, []};
{error, Reason} ->
_ = lager:info("commit failed: ~p", [Reason]),
{{error, operation_conflict}, []}
end.
%%
-spec read_history(machine() | history()) ->
dmt:history().
read_history(#'Machine'{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #{}).
-spec read_history([dmsl_state_processing_thrift:'Event'()], dmt:history()) ->
dmt:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = EventData} | Rest], History) ->
{commit, Commit} = decode_event(EventData),
read_history(Rest, History#{Id => Commit}).
%%
encode_event({commit, Commit}) ->
{arr, [{str, <<"commit">>}, encode(commit, Commit)]}.
decode_event({arr, [{str, <<"commit">>}, Commit]}) ->
{commit, decode(commit, Commit)}.
%%
encode_call({commit, Version, Commit}) ->
{arr, [{str, <<"commit">>}, {i, Version}, encode(commit, Commit)]}.
decode_call({arr, [{str, <<"commit">>}, {i, Version}, Commit]}) ->
{commit, Version, decode(commit, Commit)}.
encode_call_result({commit, _, _}, {ok, Snapshot}) ->
{arr, [{str, <<"ok">> }, encode(snapshot, Snapshot)]};
encode_call_result({commit, _, _}, {error, Reason}) ->
{arr, [{str, <<"err">>}, {str, atom_to_binary(Reason, utf8)}]}.
decode_call_result({commit, _, _}, {arr, [{str, <<"ok">> }, Snapshot]}) ->
{ok, decode(snapshot, Snapshot)};
decode_call_result({commit, _, _}, {arr, [{str, <<"err">>}, {str, Reason}]}) ->
{error, binary_to_existing_atom(Reason, utf8)}.
%%
decode(T, V) ->
dmt_api_thrift_utils:decode(msgpack, get_type_info(T), V).
encode(T, V) ->
dmt_api_thrift_utils:encode(msgpack, get_type_info(T), V).
get_type_info(commit) ->
{struct, struct, {dmsl_domain_config_thrift, 'Commit'}};
get_type_info(snapshot) ->
{struct, struct, {dmsl_domain_config_thrift, 'Snapshot'}}.

View File

@ -1,55 +0,0 @@
-module(dmt_api_state_processor).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include_lib("dmsl/include/dmsl_state_processing_thrift.hrl").
-spec handle_function(
woody:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {ok, woody_server_thrift_handler:result()} | no_return().
handle_function('ProcessCall', [#'CallArgs'{arg = Payload, machine = Machine}], _Context, _Opts) ->
{Response, Events} = handle_call(binary_to_term(Payload), dmt_api_mg:read_history(Machine)),
{ok, #'CallResult'{
change = #'MachineStateChange'{
aux_state = <<>>,
events = lists:map(fun term_to_binary/1, Events)
},
action = #'ComplexAction'{},
response = term_to_binary(Response)
}};
handle_function('ProcessSignal', [#'SignalArgs'{signal = {init, #'InitSignal'{}}}], _Context, _Opts) ->
{ok, #'SignalResult'{
change = #'MachineStateChange'{
aux_state = <<>>,
events = []
},
action = #'ComplexAction'{}
}};
handle_function('ProcessSignal', [#'SignalArgs'{signal = {repair, #'RepairSignal'{}}}], _Context, _Opts) ->
{ok, #'SignalResult'{
change = #'MachineStateChange'{
aux_state = <<>>,
events = []
},
action = #'ComplexAction'{}
}}.
%%
handle_call({commit, Version, Commit}, History) ->
case dmt_api:apply_commit(Version, Commit, History) of
{ok, Snapshot} ->
{Snapshot, [Commit]};
{error, version_not_found} ->
{{error, version_not_found}, []};
{error, Reason} ->
_ = lager:info("commit failed: ~p", [Reason]),
{{error, operation_conflict}, []}
end.

View File

@ -0,0 +1,347 @@
-module(dmt_api_thrift_msgpack_protocol).
%%% Thrift library uses legacy behaviour definition that makes dialyzer angry.
%%% -behaviour(thrift_protocol).
-include_lib("thrift/include/thrift_constants.hrl").
-include_lib("thrift/include/thrift_protocol.hrl").
-export([
new/0,
new/1,
read/2,
write/2,
flush_transport/1,
close_transport/1
]).
-record(msgpack_protocol, {production}).
-type state() :: #msgpack_protocol{}.
-include_lib("thrift/include/thrift_protocol_behaviour.hrl").
new() ->
new([]).
new(Production) ->
State = #msgpack_protocol{production = Production},
thrift_protocol:new(?MODULE, State).
flush_transport(This) ->
{This, ok}.
close_transport(This = #msgpack_protocol{production = Production}) ->
{This, {ok, Production}}.
%%%
%%% instance methods
%%%
typeid_to_string(?tType_BOOL) -> <<"bool">>;
typeid_to_string(?tType_DOUBLE) -> <<"dbl">>;
typeid_to_string(?tType_I8) -> <<"i8">>;
typeid_to_string(?tType_I16) -> <<"i16">>;
typeid_to_string(?tType_I32) -> <<"i32">>;
typeid_to_string(?tType_I64) -> <<"i64">>;
typeid_to_string(?tType_STRING) -> <<"str">>;
typeid_to_string(?tType_STRUCT) -> <<"struct">>;
typeid_to_string(?tType_MAP) -> <<"map">>;
typeid_to_string(?tType_SET) -> <<"set">>;
typeid_to_string(?tType_LIST) -> <<"list">>.
-define(str(V) , {str , V}).
-define(int(V) , { i , V}).
-define(flt(V) , {flt , V}).
-define(bool(V) , { b , V}).
write(This, #protocol_message_begin{name = Name, type = Type, seqid = Seqid}) ->
do_write_many(This, [
{enter, array},
?str(Name),
?int(Type),
?int(Seqid)
]);
write(This, message_end) ->
do_write(This, {exit, array});
write(This, #protocol_struct_begin{}) ->
do_write(This, {enter, map});
write(This, struct_end) ->
{This, ok};
write(This, #protocol_field_begin{name = Name, type = Type, id = Id}) ->
do_write_many(This, [
?str(genlib:to_binary(Name)),
{enter, array},
?int(Id),
?str(typeid_to_string(Type))
]);
write(This, field_end) ->
do_write(This, {exit, array});
write(This, field_stop) ->
do_write(This, {exit, map});
write(This, #protocol_map_begin{ktype = Ktype, vtype = Vtype, size = Size}) ->
do_write_many(This, [
{enter, array},
{enter, map},
?str(typeid_to_string(Ktype)),
?str(typeid_to_string(Vtype)),
{exit, map},
?int(Size),
{enter, map}
]);
write(This, map_end) ->
do_write_many(This, [
{exit, map},
{exit, array}
]);
write(This, #protocol_list_begin{etype = Etype, size = Size}) ->
do_write_many(This, [
{enter, array},
?str(typeid_to_string(Etype)),
?int(Size)
]);
write(This, list_end) ->
do_write(This, {exit, array});
write(This, #protocol_set_begin{etype = Etype, size = Size}) ->
write(This, #protocol_list_begin{etype = Etype, size = Size});
write(This, set_end) ->
write(This, list_end);
write(This, {bool, V}) ->
do_write(This, ?bool(V));
write(This, {byte, Byte}) ->
do_write(This, ?int(Byte));
write(This, {i16, I16}) ->
do_write(This, ?int(I16));
write(This, {i32, I32}) ->
do_write(This, ?int(I32));
write(This, {i64, I64}) ->
do_write(This, ?int(I64));
write(This, {double, Double}) ->
do_write(This, ?flt(Double));
write(This, {string, Bin}) when is_binary(Bin) ->
% FIXME nonprintable?
do_write(This, ?str(Bin)).
%%
do_write_many(#msgpack_protocol{production = Production}, Vs) ->
{#msgpack_protocol{production = write_vals(Production, Vs)}, ok}.
do_write(#msgpack_protocol{production = Production}, V) ->
{#msgpack_protocol{production = write_val(Production, V)}, ok}.
write_vals(P0, [V | Vs]) ->
write_vals(write_val(P0, V), Vs);
write_vals(P0, []) ->
P0.
write_val(Production, {enter, array}) ->
{array, [], Production};
write_val({array, Vs, Production}, {exit, array}) ->
write_val(Production, {arr, lists:reverse(Vs)});
write_val(Production, {enter, map}) ->
{map, none, #{}, Production};
write_val({map, none, Vs, Production}, {exit, map}) ->
write_val(Production, {obj, Vs});
write_val({array, Vs, Production}, V) ->
{array, [V | Vs], Production};
write_val({map, none, Vs, Production}, K) ->
{map, K, Vs, Production};
write_val({map, K, Vs, Production}, V) ->
{map, none, Vs#{K => V}, Production};
write_val([], V) ->
V.
%%
string_to_typeid(<<"bool">>) -> ?tType_BOOL;
string_to_typeid(<<"dbl">>) -> ?tType_DOUBLE;
string_to_typeid(<<"i8">>) -> ?tType_I8;
string_to_typeid(<<"i16">>) -> ?tType_I16;
string_to_typeid(<<"i32">>) -> ?tType_I32;
string_to_typeid(<<"i64">>) -> ?tType_I64;
string_to_typeid(<<"str">>) -> ?tType_STRING;
string_to_typeid(<<"struct">>) -> ?tType_STRUCT;
string_to_typeid(<<"map">>) -> ?tType_MAP;
string_to_typeid(<<"set">>) -> ?tType_SET;
string_to_typeid(<<"list">>) -> ?tType_LIST.
read(This, message_begin) ->
{This1, {ok, [Name, Type, SeqId]}} = do_read_many(This, [
{enter, array},
str,
int,
int
]),
{This1, #protocol_message_begin{name = binary_to_list(Name), type = Type, seqid = SeqId}};
read(This, message_end) ->
do_read(This, {exit, array});
read(This, struct_begin) ->
do_read(This, {enter, map});
read(This, struct_end) ->
do_read(This, {exit, map});
read(This0, field_begin) ->
case do_read(This0, str) of
{This1, {ok, Name}} ->
{This2, {ok, [Id, Type]}} = do_read_many(This1, [{enter, array}, int, str]),
{This2, #protocol_field_begin{type = string_to_typeid(Type), id = Id, name = Name}};
{This1, {error, object_exhausted}} ->
{This1, #protocol_field_begin{type = ?tType_STOP}}
end;
read(This0, field_end) ->
do_read(This0, {exit, array});
read(This0, map_begin) ->
{This1, {ok, [Ktype, Vtype, Size]}} = do_read_many(This0, [
{enter, array},
{enter, map},
str,
str,
{exit, map},
int,
{enter, map}
]),
{This1, #protocol_map_begin{
ktype = string_to_typeid(Ktype),
vtype = string_to_typeid(Vtype),
size = Size
}};
read(This, map_end) ->
do_read_many(This, [{exit, map}, {exit, array}]);
read(This0, list_begin) ->
{This1, {ok, [Etype, Size]}} = do_read_many(This0, [
{enter, array},
str,
int
]),
{This1, #protocol_list_begin{
etype = string_to_typeid(Etype),
size = Size
}};
read(This0, list_end) ->
do_read(This0, {exit, array});
read(This0, set_begin) ->
{This1, #protocol_list_begin{etype = Etype, size = Size}} = read(This0, list_begin),
{This1, #protocol_set_begin{etype = Etype, size = Size}};
read(This0, set_end) ->
read(This0, list_end);
read(This0, field_stop) ->
{This0, ok};
read(This0, bool) ->
do_read(This0, bool);
read(This0, byte) ->
do_read(This0, int);
read(This0, i16) ->
do_read(This0, int);
read(This0, i32) ->
do_read(This0, int);
read(This0, i64) ->
do_read(This0, int);
read(This0, double) ->
do_read(This0, flt);
read(This0, string) ->
do_read(This0, str).
%%
do_read_many(This = #msgpack_protocol{production = P0}, Ts) ->
{P1, Result} = do_read_many(P0, Ts, []),
{This#msgpack_protocol{production = P1}, Result}.
do_read_many(P0, [T | Ts], Vs) ->
case read_val(P0, T) of
{P1, ok} ->
do_read_many(P1, Ts, Vs);
{P1, {ok, V}} ->
do_read_many(P1, Ts, [V | Vs]);
{_P, {error, _}} = Result ->
Result
end;
do_read_many(P0, [], []) ->
{P0, ok};
do_read_many(P0, [], Vs) ->
{P0, {ok, lists:reverse(Vs)}}.
do_read(#msgpack_protocol{production = P0}, T) ->
{P1, R} = read_val(P0, T),
{#msgpack_protocol{production = P1}, R}.
read_val(Production, {enter, array}) ->
case read_next(Production) of
{Rest, {arr, Vs}} ->
{{array, Vs, Rest}, ok};
{_Rest, {error, _} = Error} ->
{Production, Error};
{_Rest, {T, _V}} ->
{Production, {error, {array_expected, T}}}
end;
read_val({array, [], Rest}, {exit, array}) ->
{Rest, ok};
read_val(Production, {enter, map}) ->
case read_next(Production) of
{Rest, {obj, Vs}} ->
{{map, none, maps:to_list(Vs), Rest}, ok};
{_Rest, {error, _} = Error} ->
{Production, Error};
{_Rest, {T, _V}} ->
{Production, {error, {object_expected, T}}}
end;
read_val({map, none, [], Rest}, {exit, map}) ->
{Rest, ok};
read_val(P0, T) ->
case read_next(P0) of
{P1, {error, _} = Error} ->
{P1, Error};
{P1, V} ->
{P1, read_scalar(V, T)}
end.
read_next({array, [V | Vs], Rest}) ->
{{array, Vs, Rest}, V};
read_next({array, [], _} = Production) ->
{Production, {error, array_exhausted}};
read_next({map, none, [{K, V} | Vs], Rest}) ->
{{map, V, Vs, Rest}, K};
read_next({map, none, [], _} = Production) ->
{Production, {error, object_exhausted}};
read_next({map, V, Vs, Rest}) ->
{{map, none, Vs, Rest}, V};
read_next(V) ->
{[], V}.
read_scalar(?bool(V), bool) ->
{ok, V};
read_scalar(?str(V), str) ->
{ok, V};
read_scalar(?int(V), int) ->
{ok, V};
read_scalar(?flt(V), flt) ->
{ok, V};
read_scalar(V, T) ->
{error, {type_mismatch, V, T}}.

View File

@ -0,0 +1,44 @@
-module(dmt_api_thrift_utils).
-export([encode/3]).
-export([decode/3]).
-type thrift_type() :: term().
-type thrift_value() :: term().
-spec encode
(binary, thrift_type(), thrift_value()) ->
binary();
(msgpack, thrift_type(), thrift_value()) ->
dmsl_msgpack_thrift:'Value'().
encode(Proto, Type, Value) ->
{ok, Proto0} = new_protocol(Proto),
{Proto1, ok} = thrift_protocol:write(Proto0, {Type, Value}),
{_Proto, {ok, Data}} = thrift_protocol:close_transport(Proto1),
Data.
-spec decode
(binary, thrift_type(), binary()) ->
thrift_value();
(msgpack, thrift_type(), dmsl_msgpack_thrift:'Value'()) ->
thrift_value().
decode(Proto, Type, Data) ->
{ok, Proto0} = new_protocol(Proto, Data),
{_Proto, {ok, Value}} = thrift_protocol:read(Proto0, Type),
Value.
%%
new_protocol(binary) ->
{ok, Trans} = thrift_membuffer_transport:new(),
thrift_binary_protocol:new(Trans, [{strict_read, true}, {strict_write, true}]);
new_protocol(msgpack) ->
dmt_api_thrift_msgpack_protocol:new().
new_protocol(binary, Data) ->
{ok, Trans} = thrift_membuffer_transport:new(Data),
thrift_binary_protocol:new(Trans);
new_protocol(msgpack, Data) ->
dmt_api_thrift_msgpack_protocol:new(Data).

View File

@ -5,6 +5,8 @@
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([insert/1]).
-export([update/1]).
@ -24,12 +26,15 @@
-spec all() -> [{group, group_name()}].
all() ->
[
{group, basic_lifecycle}
{group, basic_lifecycle_v1},
{group, basic_lifecycle_v2}
].
-spec groups() -> [{group_name(), list(), [test_case_name()]}].
groups() ->
[
{basic_lifecycle_v1, [sequence], [{group, basic_lifecycle}]},
{basic_lifecycle_v2, [sequence], [{group, basic_lifecycle}]},
{basic_lifecycle, [sequence, {repeat, 10}, shuffle], [
insert,
update,
@ -41,24 +46,42 @@ groups() ->
%% starting/stopping
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
Apps =
genlib_app:start_application_with(lager, [
{async_threshold, 1},
{async_threshold_window, 0},
{error_logger_hwm, 600},
{suppress_application_start_stop, true},
{handlers, [
{lager_common_test_backend, [debug, false]}
]}
]) ++
genlib_app:start_application_with(dmt_api, [
{automaton_service_url, "http://machinegun:8022/v1/automaton"}
]),
[{suite_apps, Apps}, {counter, 1} | C].
Apps = genlib_app:start_application_with(lager, [
{async_threshold, 1},
{async_threshold_window, 0},
{error_logger_hwm, 600},
{suppress_application_start_stop, true},
{handlers, [
{lager_common_test_backend, [info, false]}
]}
]),
[{suite_apps, Apps} | C].
-spec end_per_suite(config()) -> term().
end_per_suite(C) ->
[application:stop(App) || App <- lists:reverse(?config(suite_apps, C))].
genlib_app:stop_unload_applications(?config(suite_apps, C)).
-spec init_per_group(group_name(), config()) -> config().
init_per_group(basic_lifecycle_v1, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v1)} | C];
init_per_group(basic_lifecycle_v2, C) ->
[{group_apps, start_with_repository(dmt_api_repository_v2)} | C];
init_per_group(_, C) ->
C.
start_with_repository(Repository) ->
genlib_app:start_application_with(dmt_api, [
{repository, Repository},
{automaton_service_url, "http://machinegun:8022/v1/automaton"}
]).
-spec end_per_group(group_name(), config()) -> term().
end_per_group(basic_lifecycle_v1, C) ->
genlib_app:stop_unload_applications(?config(group_apps, C));
end_per_group(basic_lifecycle_v2, C) ->
genlib_app:stop_unload_applications(?config(group_apps, C));
end_per_group(_, _C) ->
ok.
%%
%% tests