Split dominant (#3)

* split dominant to three repos

* fix submodule name

* new jenkins

* fix wc_release

* fix trailing space

* fix permissions

* fix path in dockerfile

* fix cmd

* fix release

* fix mg api

* fix the rest

* update dmt_core

* MG-16: Migrate to the new machinegun proto

* Fix some bugs involving cold cache access
* Harden the test suite
* Ensure proper woody context flow

* MG-16: Fix dialyzer warnings

* MG-16: Attempt to hack around docker-compose network connectivity quirks

* MG-16: Remove unused files

* MG-16: Mention potential race condition in the TODO list
This commit is contained in:
Igor Savchuk 2016-09-07 21:43:14 +02:00 committed by Andrew Mayorov
parent ba644bcce2
commit 2872e3a1f0
47 changed files with 711 additions and 1084 deletions

11
.gitignore vendored
View File

@ -6,10 +6,9 @@ erl_crash.dump
.tags*
*.sublime-workspace
.DS_Store
Dockerfile
docker-compose.yml
# wercker
/_builds/
/_cache/
/_projects/
/_steps/
/_temp/
#thrift
/include/dmt_*_thrift.hrl
/src/dmt_*_thrift.erl

8
.gitmodules vendored
View File

@ -1,3 +1,7 @@
[submodule "apps/dmt_proto/damsel"]
path = apps/dmt_proto/damsel
[submodule "damsel"]
path = damsel
url = git@github.com:rbkmoney/damsel.git
[submodule "build_utils"]
path = build_utils
url = git@github.com:rbkmoney/build_utils.git
branch = master

View File

@ -1,6 +0,0 @@
FROM rbkmoney/service_erlang:latest
MAINTAINER Igor Savchuk <i.savchuk@rbkmoney.com>
COPY _build/prod/rel/dmt /opt/dominant
CMD ["/opt/dominant/bin/dmt", "foreground"]
LABEL service_version="semver"
WORKDIR /opt/dominant

27
Dockerfile.sh Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
cat <<EOF
FROM $BASE_IMAGE
MAINTAINER Igor Savchuk <i.savchuk@rbkmoney.com>
COPY _build/prod/rel/dominant /opt/dominant
CMD /opt/dominant/bin/dominant foreground
EXPOSE 8022
# A bit of magic below to get a proper branch name
# even when the HEAD is detached (Hey Jenkins!
# BRANCH_NAME is available in Jenkins env).
LABEL com.rbkmoney.$SERVICE_NAME.parent=$BASE_IMAGE_NAME \
com.rbkmoney.$SERVICE_NAME.parent_tag=$BASE_IMAGE_TAG \
com.rbkmoney.$SERVICE_NAME.build_img=build \
com.rbkmoney.$SERVICE_NAME.build_img_tag=$BUILD_IMAGE_TAG \
com.rbkmoney.$SERVICE_NAME.commit_id=$(git rev-parse HEAD) \
com.rbkmoney.$SERVICE_NAME.commit_num=$(git rev-list --count HEAD) \
com.rbkmoney.$SERVICE_NAME.branch=$( \
if [ "HEAD" != $(git rev-parse --abbrev-ref HEAD) ]; then \
echo $(git rev-parse --abbrev-ref HEAD); \
elif [ -n "$BRANCH_NAME" ]; then \
echo $BRANCH_NAME; \
else \
echo $(git name-rev --name-only HEAD); \
fi)
WORKDIR /opt/dominant
EOF

83
Jenkinsfile vendored
View File

@ -1,34 +1,57 @@
#!groovy
// Args:
// GitHub repo name
// Jenkins agent label
// Tracing artifacts to be stored alongside build logs
// Optional: artifacts to cache between the builds
pipeline("dmt", 'docker-host', "_build/") {
// ToDo: Uncomment the stage as soon as Elvis is in the build image!
// runStage('lint') {
// sh 'make w_container_lint'
// }
runStage('compile') {
sh 'make w_container_compile'
}
runStage('xref') {
sh 'make w_container_xref'
}
runStage('test_api') {
sh "make w_container_test_api"
}
runStage('test_client') {
sh "make w_container_test_client"
}
runStage('dialyze') {
sh 'make w_container_dialyze'
def finalHook = {
runStage('store CT logs') {
archive '_build/test/logs/'
}
}
build('dominant', 'docker-host', finalHook) {
checkoutRepo()
loadBuildUtils()
def pipeDefault
runStage('load pipeline') {
env.JENKINS_LIB = "build_utils/jenkins_lib"
pipeDefault = load("${env.JENKINS_LIB}/pipeDefault.groovy")
}
pipeDefault() {
runStage('submodules') {
withGithubPrivkey {
sh 'make submodules'
}
}
runStage('compile') {
withGithubPrivkey {
sh 'make wc_compile'
}
}
runStage('lint') {
sh 'make wc_lint'
}
runStage('xref') {
sh 'make wc_xref'
}
runStage('dialyze') {
sh 'make wc_dialyze'
}
runStage('test') {
sh "make wdeps_test"
}
runStage('make release') {
withGithubPrivkey {
sh "make wc_release"
}
}
runStage('build image') {
sh "make build_image"
}
if (env.BRANCH_NAME == 'master') {
runStage('push image') {
sh "make push_image"
}
}
}
}

View File

@ -1,8 +1,27 @@
REBAR := $(shell which rebar3 2>/dev/null || which ./rebar3)
SUBMODULES = apps/dmt_proto/damsel
SUBMODULES = damsel build_utils
SUBTARGETS = $(patsubst %,%/.git,$(SUBMODULES))
ORG_NAME := rbkmoney
UTILS_PATH := build_utils
TEMPLATES_PATH := .
SERVICE_NAME := dominant
# Service image default tag
SERVICE_IMAGE_TAG ?= $(shell git rev-parse HEAD)
# The tag for service image to be pushed with
SERVICE_IMAGE_PUSH_TAG ?= $(SERVICE_IMAGE_TAG)
# Base image for the service
BASE_IMAGE_NAME := service_erlang
BASE_IMAGE_TAG := 2202a02cbcb71982fea2e901ffb2b1ca5da610ae
## Variables required for utils_container.mk
# Build image tag to be used
BUILD_IMAGE_TAG := 753126790c9ecd763840d9fe58507335af02b875
BASE_IMAGE := "$(ORG_NAME)/build:latest"
RELNAME := dominant
@ -11,12 +30,15 @@ IMAGE_NAME = "$(ORG_NAME)/$(RELNAME):$(TAG)"
CALL_ANYWHERE := submodules rebar-update compile xref lint dialyze start devrel release clean distclean
CALL_W_CONTAINER := $(CALL_ANYWHERE) test_api
include utils.mk
CALL_W_CONTAINER := $(CALL_ANYWHERE) test
.PHONY: $(CALL_W_CONTAINER) all containerize push $(UTIL_TARGETS)
all: compile
-include $(UTILS_PATH)/make_lib/utils_container.mk
-include $(UTILS_PATH)/make_lib/utils_image.mk
# CALL_ANYWHERE
$(SUBTARGETS): %/.git: %
git submodule update --init $<
@ -56,28 +78,5 @@ distclean:
rm -rfv _build _builds _cache _steps _temp
# CALL_W_CONTAINER
test_api: submodules
$(REBAR) ct --suite apps/dmt_api/test/dmt_api_tests_SUITE.erl
test_client: submodules
$(REBAR) ct --suite apps/dmt_api/test/dmt_client_tests_SUITE.erl
w_container_test_client: submodules
{ \
$(DOCKER_COMPOSE) up -d ; \
$(DOCKER_COMPOSE) exec -T -d dominant make start ; \
$(DOCKER_COMPOSE) exec -T dmt_client make test_client ; \
res=$$? ; \
$(DOCKER_COMPOSE) down ; \
exit $$res ; \
}
# OTHER
all: compile
containerize: w_container_release
$(DOCKER) build --force-rm --tag $(IMAGE_NAME) .
push: containerize
$(DOCKER) push "$(IMAGE_NAME)"
test: submodules
$(REBAR) ct

16
TODO.md Normal file
View File

@ -0,0 +1,16 @@
# Alpha
* Evict cache entries in order to provide bounded memory usage
* LRU cache?
* Hard limits
* Bound computational complexity
* Context-aware machinegun?
* Periodical history snapshotting?
* Reuse cache entries where possible
# Release
* Simplify cache interfaces
* Encode machine events and responses with a schema-aware protocol
* Thrift / compact protocol?
* Fix potential race when a `Repository` request getting processed earlier than the start machine request being issued.

View File

@ -1 +0,0 @@
-define(MG_TAG, <<"dmt_storage">>).

View File

@ -1,23 +0,0 @@
{application, dmt, [
{description, "Domain config service"},
{vsn, "0"},
{registered, []},
{mod, {dmt, []}},
{applications, [
kernel,
stdlib,
lager,
genlib,
woody,
dmt_proto
]},
{env, [
{mgun_automaton_url, "http://machinegun:8022/v1/automaton_service"}
]},
{modules, []},
{maintainers, [
"Andrey Mayorov <a.mayorov@rbkmoney.com>"
]},
{licenses, []},
{links, ["https://github.com/rbkmoney/dominant"]}
]}.

View File

@ -1,110 +0,0 @@
%%% @doc Public API, supervisor and application startup.
%%% @end
-module(dmt).
-behaviour(supervisor).
-behaviour(application).
%% API
-export([checkout/1]).
-export([checkout_object/2]).
-export([pull/1]).
-export([commit/2]).
-export([validate_commit/3]).
%% Supervisor callbacks
-export([init/1]).
%% Application callbacks
-export([start/2]).
-export([stop/1]).
%% Type shortcuts
-export_type([version/0]).
-export_type([head/0]).
-export_type([ref/0]).
-export_type([snapshot/0]).
-export_type([commit/0]).
-export_type([operation/0]).
-export_type([history/0]).
-export_type([object_ref/0]).
-export_type([domain/0]).
-export_type([domain_object/0]).
-type version() :: dmt_domain_config_thrift:'Version'().
-type head() :: dmt_domain_config_thrift:'Head'().
-type ref() :: dmt_domain_config_thrift:'Reference'().
-type snapshot() :: dmt_domain_config_thrift:'Snapshot'().
-type commit() :: dmt_domain_config_thrift:'Commit'().
-type operation() :: dmt_domain_config_thrift:'Operation'().
-type history() :: dmt_domain_config_thrift:'History'().
-type object_ref() :: dmt_domain_thrift:'Reference'().
-type domain() :: dmt_domain_thrift:'Domain'().
-type domain_object() :: dmt_domain_thrift:'DomainObject'().
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
%% API
-spec checkout(ref()) -> snapshot().
checkout(Reference) ->
dmt_cache:checkout(Reference).
-spec checkout_object(ref(), object_ref()) ->
dmt_domain_config_thrift:'VersionedObject'().
checkout_object(Reference, ObjectReference) ->
#'Snapshot'{version = Version, domain = Domain} = checkout(Reference),
Object = dmt_domain:get_object(ObjectReference, Domain),
#'VersionedObject'{version = Version, object = Object}.
-spec pull(version()) -> history().
pull(Version) ->
dmt_mg:get_history(Version).
-spec commit(version(), commit()) -> version().
commit(Version, Commit) ->
ok = dmt_mg:commit(Version, Commit),
Version + 1.
-spec validate_commit(version(), commit(), history()) -> ok.
validate_commit(Version, _Commit, History) ->
%%TODO: actually validate commit
LastVersion = case map_size(History) of
0 ->
0;
_Size ->
lists:max(maps:keys(History))
end,
case Version =:= LastVersion of
true ->
ok;
false ->
throw(bad_version)
end.
%% Supervisor callbacks
-spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
Cache = #{id => dmt_cache, start => {dmt_cache, start_link, []}, restart => permanent},
Poller = #{id => dmt_poller, start => {dmt_poller, start_link, []}, restart => permanent},
Children = case application:get_env(dmt, slave_mode) of
{ok, true} ->
[Cache, Poller];
_ ->
[Cache]
end,
{ok, {#{strategy => rest_for_one, intensity => 10, period => 60}, Children}}.
%% Application callbacks
-spec start(normal, any()) -> {ok, pid()} | {error, any()}.
start(_StartType, _StartArgs) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec stop(any()) -> ok.
stop(_State) ->
ok.

View File

@ -1,148 +0,0 @@
-module(dmt_cache).
-behaviour(gen_server).
%%
-export([start_link/0]).
-export([checkout_head/0]).
-export([checkout/1]).
-export([cache/1]).
-export([cache_snapshot/1]).
-export([commit/1]).
%%
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
-define(TABLE, ?MODULE).
-define(SERVER, ?MODULE).
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%%
-spec start_link() -> {ok, pid()} | {error, term()}. % FIXME
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec checkout_head() -> dmt:snapshot().
checkout_head() ->
checkout({head, #'Head'{}}).
-spec checkout(dmt:ref()) -> dmt:snapshot().
checkout({head, #'Head'{}}) ->
checkout({version, head()});
checkout({version, Version}) ->
case ets:lookup(?TABLE, Version) of
[Snapshot] ->
Snapshot;
[] ->
gen_server:call(?SERVER, {cache, Version})
end.
-spec cache(dmt:version()) -> dmt:snapshot().
cache(Version) ->
gen_server:call(?SERVER, {cache, Version}).
-spec cache_snapshot(dmt:snapshot()) -> ok.
cache_snapshot(Snapshot) ->
gen_server:call(?SERVER, {cache_snapshot, Snapshot}).
-spec commit(dmt:commit()) -> ok.
commit(Commit) ->
gen_server:call(?SERVER, {commit, Commit}).
%%
-record(state, {
}).
-type state() :: #state{}.
-spec init(_) -> {ok, state()}.
init(_) ->
EtsOpts = [
named_table,
ordered_set,
protected,
{read_concurrency, true},
{keypos, #'Snapshot'.version}
],
?TABLE = ets:new(?TABLE, EtsOpts),
{ok, #state{}}.
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
handle_call({cache, Version}, _From, State) ->
Closest = closest_snapshot(Version),
Snapshot = dmt_history:travel(Version, dmt_mg:get_history(), Closest),
true = ets:insert(?TABLE, Snapshot),
{reply, Snapshot, State};
handle_call({cache_snapshot, Snapshot}, _From, State) ->
true = ets:insert(?TABLE, Snapshot),
{reply, ok, State};
handle_call({commit, #'Commit'{ops = Ops}}, _From, State) ->
#'Snapshot'{version = Version, domain = Domain} = checkout({head, #'Head'{}}),
NewSnapshot = #'Snapshot'{
version = Version + 1,
domain = dmt_domain:apply_operations(Ops, Domain)
},
true = ets:insert(?TABLE, NewSnapshot),
{reply, NewSnapshot, State};
handle_call(_Msg, _From, State) ->
{noreply, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Msg, State) ->
{noreply, State}.
-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(_Msg, State) ->
{noreply, State}.
-spec terminate(term(), state()) -> ok.
terminate(_Reason, _State) ->
ok.
-spec code_change(term(), state(), term()) -> {error, noimpl}.
code_change(_OldVsn, _State, _Extra) ->
{error, noimpl}.
%% internal
-spec head() -> dmt:version().
head() ->
case ets:last(?TABLE) of
'$end_of_table' ->
% #'Snapshot'{version = Version} = dmt_history:head(dmt_mg:get_history()),
% Version;
0;
Version ->
Version
end.
-spec closest_snapshot(dmt:version()) -> dmt:snapshot().
closest_snapshot(Version) ->
CachedVersions = ets:select(?TABLE, ets:fun2ms(fun (#'Snapshot'{version = V}) -> V end)),
Closest = lists:foldl(fun (V, Acc) ->
case abs(V - Version) =< abs(Acc - Version) of
true ->
V;
false ->
Acc
end
end, 0, CachedVersions),
case Closest of
0 ->
#'Snapshot'{version = 0, domain = dmt_domain:new()};
Closest ->
ets:lookup(?TABLE, Closest)
end.

View File

@ -1,85 +0,0 @@
-module(dmt_domain).
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
%%
-export([new/0]).
-export([get_object/2]).
-export([apply_operations/2]).
-export([revert_operations/2]).
%%
-spec new() ->
dmt:domain().
new() ->
#{}.
-spec get_object(dmt:object_ref(), dmt:domain()) ->
dmt:domain_object().
get_object(ObjectReference, Domain) ->
case maps:find(ObjectReference, Domain) of
{ok, Object} ->
Object;
error ->
throw(object_not_found)
end.
-spec apply_operations([dmt:operation()], dmt:domain()) -> dmt:domain().
apply_operations([], Domain) ->
Domain;
apply_operations([{insert, #'InsertOp'{object = Object}} | Rest], Domain) ->
apply_operations(Rest, insert(Object, Domain));
apply_operations([{update, #'UpdateOp'{old_object = OldObject, new_object = NewObject}} | Rest], Domain) ->
apply_operations(Rest, update(OldObject, NewObject, Domain));
apply_operations([{remove, #'RemoveOp'{object = Object}} | Rest], Domain) ->
apply_operations(Rest, delete(Object, Domain)).
-spec revert_operations([dmt:operation()], dmt:domain()) -> dmt:domain().
revert_operations([], Domain) ->
Domain;
revert_operations([{insert, #'InsertOp'{object = Object}} | Rest], Domain) ->
revert_operations(Rest, delete(Object, Domain));
revert_operations([{update, #'UpdateOp'{old_object = OldObject, new_object = NewObject}} | Rest], Domain) ->
revert_operations(Rest, update(NewObject, OldObject, Domain));
revert_operations([{remove, #'RemoveOp'{object = Object}} | Rest], Domain) ->
revert_operations(Rest, insert(Object, Domain)).
-spec insert(dmt:domain_object(), dmt:domain()) -> dmt:domain().
insert(Object, Domain) ->
ObjectReference = get_ref(Object),
case maps:is_key(ObjectReference, Domain) of
false ->
maps:put(ObjectReference, Object, Domain);
true ->
throw(object_already_exists)
end.
-spec update(dmt:domain_object(), dmt:domain_object(), dmt:domain()) -> dmt:domain().
update(OldObject, NewObject, Domain) ->
ObjectReference = get_ref(OldObject),
ObjectReference = get_ref(NewObject),
case maps:find(ObjectReference, Domain) of
{ok, OldObject} ->
maps:put(ObjectReference, NewObject, Domain);
error ->
throw(object_not_found)
end.
-spec delete(dmt:domain_object(), dmt:domain()) -> dmt:domain().
delete(Object, Domain) ->
ObjectReference = get_ref(Object),
case maps:find(ObjectReference, Domain) of
{ok, Object} ->
maps:remove(ObjectReference, Domain);
error ->
throw(object_not_found)
end.
%%TODO:elaborate
-spec get_ref(dmt:domain_object()) -> dmt:object_ref().
get_ref({Tag, {_Type, Ref, _Data}}) ->
{Tag, Ref}.

View File

@ -1,39 +0,0 @@
-module(dmt_history).
-export([head/1]).
-export([head/2]).
-export([travel/3]).
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
-spec head(dmt:history()) -> dmt:snapshot().
head(History) when map_size(History) =:= 0 ->
#'Snapshot'{version = 0, domain = dmt_domain:new()};
head(History) ->
head(History, #'Snapshot'{version = 0, domain = dmt_domain:new()}).
-spec head(dmt:history(), dmt:snapshot()) -> dmt:snapshot().
head(History, Snapshot) ->
Head = lists:max(maps:keys(History)),
travel(Head, History, Snapshot).
-spec travel(dmt:version(), dmt:history(), dmt:snapshot()) -> dmt:snapshot().
travel(To, _History, #'Snapshot'{version = From} = Snapshot)
when To =:= From ->
Snapshot;
travel(To, History, #'Snapshot'{version = From, domain = Domain})
when To > From ->
#'Commit'{ops = Ops} = maps:get(From + 1, History),
NextSnapshot = #'Snapshot'{
version = From + 1,
domain = dmt_domain:apply_operations(Ops, Domain)
},
travel(To, History, NextSnapshot);
travel(To, History, #'Snapshot'{version = From, domain = Domain})
when To < From ->
#'Commit'{ops = Ops} = maps:get(From, History),
PreviousSnapshot = #'Snapshot'{
version = From - 1,
domain = dmt_domain:revert_operations(Ops, Domain)
},
travel(To, History, PreviousSnapshot).

View File

@ -1,62 +0,0 @@
-module(dmt_mg).
-export([call/2]).
-export([start/0]).
-export([get_commit/1]).
-export([get_history/0]).
-export([get_history/1]).
-export([commit/2]).
-export([read_history/1]).
-include_lib("dmt_proto/include/dmt_state_processing_thrift.hrl").
-include_lib("dmt/include/dmt_mg.hrl").
-spec call(atom(), list(term())) ->
{ok, term()} | ok | no_return().
call(Method, Args) ->
Request = {{dmt_state_processing_thrift, 'Automaton'}, Method, Args},
Context = woody_client:new_context(
woody_client:make_id(<<"dmt">>),
dmt_api_woody_event_handler
),
{ok, MgunAutomatonUrl} = application:get_env(dmt, mgun_automaton_url),
woody_client:call(Context, Request, #{url => MgunAutomatonUrl}).
-spec start() -> ok.
start() ->
{{ok, {_, _}}, _} = call(start, {'Args', <<>>}),
ok.
-spec get_commit(dmt:version()) -> dmt:commit().
get_commit(Id) ->
#{Id := Commit} = get_history(),
Commit.
%% TODO: add range requests after they are fixed in mg
-spec get_history() -> dmt:history().
get_history() ->
get_history(undefined).
-spec get_history(dmt:version() | undefined) -> dmt:history().
get_history(After) ->
{{ok, History}, _Context} = call(getHistory, [{tag, ?MG_TAG}, #'HistoryRange'{'after' = After}]),
read_history(History).
-spec commit(dmt:version(), dmt:commit()) -> ok.
commit(Version, Commit) ->
Call = <<"commit", (term_to_binary({Version, Commit}))/binary>>,
{{ok, <<"ok">>}, _Context} = call(call, [{tag, ?MG_TAG}, Call]),
ok.
%% utils
-spec read_history([dmt_state_processing_thrift:'Event'()]) -> dmt:history().
read_history(Events) ->
read_history(Events, #{}).
-spec read_history([dmt_state_processing_thrift:'Event'()], dmt:history()) ->
dmt:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = BinaryPayload} | Rest], History) ->
read_history(Rest, History#{Id => binary_to_term(BinaryPayload)}).

View File

@ -1,74 +0,0 @@
-module(dmt_poller).
-behaviour(gen_server).
-export([start_link/0]).
-export([poll/0]).
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
-define(SERVER, ?MODULE).
-define(INTERVAL, 5000).
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec poll() -> ok.
poll() ->
gen_server:call(?SERVER, poll).
-record(state, {
timer :: reference(),
last_version = 0 :: dmt:version()
}).
-type state() :: #state{}.
-spec init(_) -> {ok, state()}.
init(_) ->
Timer = erlang:send_after(?INTERVAL, self(), poll),
{ok, #state{timer = Timer}}.
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
handle_call(poll, _From, #state{last_version = LastVersion, timer = Timer} = State) ->
_ = erlang:cancel_timer(Timer),
NewLastVersion = pull(LastVersion),
NewTimer = erlang:send_after(?INTERVAL, self(), poll),
{reply, ok, State#state{timer = NewTimer, last_version = NewLastVersion}}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Msg, State) ->
{noreply, State}.
-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(timer, #state{last_version = LastVersion} = State) ->
NewLastVersion = pull(LastVersion),
Timer = erlang:send_after(?INTERVAL, self(), poll),
{noreply, State#state{last_version = NewLastVersion, timer = Timer}};
handle_info(_Msg, State) ->
{noreply, State}.
-spec terminate(term(), state()) -> ok.
terminate(_Reason, _State) ->
ok.
-spec code_change(term(), state(), term()) -> {error, noimpl}.
code_change(_OldVsn, _State, _Extra) ->
{error, noimpl}.
%% Internal
-spec pull(dmt:version()) -> dmt:version().
pull(LastVersion) ->
FreshHistory = dmt_api_client:pull(LastVersion),
OldHead = dmt_cache:checkout_head(),
#'Snapshot'{version = NewLastVersion} = NewHead = dmt_history:head(FreshHistory, OldHead),
ok = dmt_cache:cache_snapshot(NewHead),
NewLastVersion.

View File

@ -1,3 +0,0 @@
{erl_opts, [
{parse_transform, lager_transform}
]}.

View File

@ -1,65 +0,0 @@
-module(dmt_api).
-behaviour(application).
-behaviour(supervisor).
-export([get_handler_spec/1]).
-export([start/2]).
-export([stop/1]).
-export([init/1]).
%%
-spec start(application:start_type(), term()) -> {ok, pid()} | {error, term()}.
start(_StartType, _Args) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec stop(term()) -> ok.
stop(_State) ->
ok.
%%
-spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec(), ...]}}.
init([]) ->
WoodyChildSpec = woody_server:child_spec(
?MODULE,
#{
ip => dmt_api_utils:get_hostname_ip(genlib_app:env(?MODULE, host, "dominant")),
port => genlib_app:env(?MODULE, port, 8800),
net_opts => [],
event_handler => dmt_api_woody_event_handler,
handlers => [
get_handler_spec(repository),
get_handler_spec(repository_client),
get_handler_spec(mg_processor)
]
}
),
{ok, {#{strategy => one_for_one, intensity => 0, period => 1}, [WoodyChildSpec]}}.
-spec get_handler_spec(Which) -> {Path, {woody_t:service(), module(), term()}} when
Which :: repository | repository_client | mg_processor,
Path :: iodata().
get_handler_spec(repository) ->
{"/v1/domain/repository", {
{dmt_domain_config_thrift, 'Repository'},
dmt_api_repository_handler,
[]
}};
get_handler_spec(repository_client) ->
{"/v1/domain/repository_client", {
{dmt_domain_config_thrift, 'RepositoryClient'},
dmt_api_repository_client_handler,
[]
}};
get_handler_spec(mg_processor) ->
{"/v1/domain/mgun_processor", {
{dmt_state_processing_thrift, 'Processor'},
dmt_api_mgun_handler,
[]
}}.

View File

@ -1,42 +0,0 @@
-module(dmt_api_client).
-export([commit/2]).
-export([checkout/1]).
-export([pull/1]).
-export([checkout_object/2]).
-spec commit(dmt:version(), dmt:commit()) -> dmt:version().
commit(Version, Commit) ->
call(repository, 'Commit', [Version, Commit]).
-spec checkout(dmt:ref()) -> dmt:snapshot().
checkout(Reference) ->
call(repository, 'Checkout', [Reference]).
-spec pull(dmt:version()) -> dmt:history().
pull(Version) ->
call(repository, 'Pull', [Version]).
-spec checkout_object(dmt:ref(), dmt:object_ref()) -> dmt:domain_object().
checkout_object(Reference, ObjectReference) ->
call(repository_client, 'checkoutObject', [Reference, ObjectReference]).
call(ServiceName, Function, Args) ->
Host = application:get_env(dmt, client_host, "dominant"),
Port = integer_to_list(application:get_env(dmt, client_port, 8800)),
{Path, {Service, _Handler, _Opts}} = dmt_api:get_handler_spec(ServiceName),
Call = {Service, Function, Args},
Server = #{url => Host ++ ":" ++ Port ++ Path},
Context = woody_client:new_context(woody_client:make_id(<<"dmt_client">>), dmt_api_woody_event_handler),
case woody_client:call_safe(Context, Call, Server) of
{ok, _Context} ->
ok;
{{ok, Response}, _Context} ->
Response;
{{exception, Exception}, _Context} ->
throw(Exception);
{{error, Error}, _Context} ->
error(Error)
end.

View File

@ -1,32 +0,0 @@
-module(dmt_api_mgun_handler).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include_lib("dmt_proto/include/dmt_state_processing_thrift.hrl").
-include_lib("dmt/include/dmt_mg.hrl").
-spec handle_function(
woody_t:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {ok | {ok, woody_server_thrift_handler:result()}, woody_client:context()} | no_return().
handle_function(processCall, {#'CallArgs'{call = <<"commit", Data/binary>>, history = History}}, Context, _Opts) ->
{Version, Commit} = binary_to_term(Data),
ok = dmt:validate_commit(Version, Commit, dmt_mg:read_history(History)),
_Snapshot = dmt_cache:commit(Commit),
{
{ok, #'CallResult'{
events = [term_to_binary(Commit)],
action = #'ComplexAction'{},
response = <<"ok">>
}},
Context
};
handle_function(processSignal, {#'SignalArgs'{signal = {init, #'InitSignal'{}}}}, Context, _Opts) ->
CA = #'ComplexAction'{tag = #'TagAction'{tag = ?MG_TAG}},
{{ok, #'SignalResult'{events = [], action = CA}}, Context}.

View File

@ -1,25 +0,0 @@
-module(dmt_api_repository_client_handler).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
-spec handle_function(
woody_t:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {ok | {ok, woody_server_thrift_handler:result()}, woody_client:context()} | no_return().
handle_function('checkoutObject', {Reference, ObjectReference}, Context, _Opts) ->
try
Object = dmt:checkout_object(Reference, ObjectReference),
{{ok, Object}, Context}
catch
object_not_found ->
throw({#'ObjectNotFound'{}, Context});
version_not_found ->
throw({#'VersionNotFound'{}, Context})
end.

View File

@ -1,41 +0,0 @@
-module(dmt_api_repository_handler).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
-spec handle_function(
woody_t:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {ok | {ok, woody_server_thrift_handler:result()}, woody_client:context()} | no_return().
handle_function('Commit', {Version, Commit}, Context, _Opts) ->
try
NewVersion = dmt:commit(Version, Commit),
{{ok, NewVersion}, Context}
catch
operation_conflict ->
throw({#'OperationConflict'{}, Context});
version_not_found ->
throw({#'VersionNotFound'{}, Context})
end;
handle_function('Checkout', {Reference}, Context, _Opts) ->
try
Snapshot = dmt:checkout(Reference),
{{ok, Snapshot}, Context}
catch
version_not_found ->
throw({#'VersionNotFound'{}, Context})
end;
handle_function('Pull', {Version}, Context, _Opts) ->
try
History = dmt:pull(Version),
{{ok, History}, Context}
catch
version_not_found ->
throw({#'VersionNotFound'{}, Context})
end.

View File

@ -1,94 +0,0 @@
-module(dmt_api_tests_SUITE).
-include_lib("common_test/include/ct.hrl").
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([application_stop/1]).
-export([insert/1]).
-export([update/1]).
-export([delete/1]).
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
%%
%% tests descriptions
%%
-spec all() -> [term()].
all() ->
[
{group, basic_lifecycle}
].
-spec groups() -> [term()].
groups() ->
[
{basic_lifecycle, [sequence], [
insert,
update,
delete
]}
].
%%
%% starting/stopping
-spec init_per_suite(term()) -> term().
init_per_suite(C) ->
{ok, Apps} = application:ensure_all_started(dmt_api),
ok = dmt_mg:start(),
[{apps, Apps}|C].
-spec end_per_suite(term()) -> term().
end_per_suite(C) ->
[application_stop(App) || App <- proplists:get_value(apps, C)].
-spec application_stop(term()) -> term().
application_stop(App) ->
application:stop(App).
%%
%% tests
-spec insert(term()) -> term().
insert(_C) ->
Object = fixture_domain_object(1, <<"InsertFixture">>),
Ref = fixture_object_ref(1),
#'ObjectNotFound'{} = (catch dmt_api_client:checkout_object({head, #'Head'{}}, Ref)),
#'Snapshot'{version = Version1} = dmt_api_client:checkout({head, #'Head'{}}),
Version2 = dmt_api_client:commit(Version1, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
#'VersionedObject'{object = Object} = dmt_api_client:checkout_object({head, #'Head'{}}, Ref),
#'ObjectNotFound'{} = (catch dmt_api_client:checkout_object({version, Version1}, Ref)),
#'VersionedObject'{object = Object} = dmt_api_client:checkout_object({version, Version2}, Ref).
-spec update(term()) -> term().
update(_C) ->
Object1 = fixture_domain_object(2, <<"UpdateFixture1">>),
Object2 = fixture_domain_object(2, <<"UpdateFixture2">>),
Ref = fixture_object_ref(2),
#'Snapshot'{version = Version0} = dmt_api_client:checkout({head, #'Head'{}}),
Version1 = dmt_api_client:commit(Version0, #'Commit'{ops = [{insert, #'InsertOp'{object = Object1}}]}),
Version2 = dmt_api_client:commit(
Version1,
#'Commit'{ops = [{update, #'UpdateOp'{old_object = Object1, new_object = Object2}}]}
),
#'VersionedObject'{object = Object1} = dmt_api_client:checkout_object({version, Version1}, Ref),
#'VersionedObject'{object = Object2} = dmt_api_client:checkout_object({version, Version2}, Ref).
-spec delete(term()) -> term().
delete(_C) ->
Object = fixture_domain_object(3, <<"DeleteFixture">>),
Ref = fixture_object_ref(3),
#'Snapshot'{version = Version0} = dmt_api_client:checkout({head, #'Head'{}}),
Version1 = dmt_api_client:commit(Version0, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
Version2 = dmt_api_client:commit(Version1, #'Commit'{ops = [{remove, #'RemoveOp'{object = Object}}]}),
#'VersionedObject'{object = Object} = dmt_api_client:checkout_object({version, Version1}, Ref),
#'ObjectNotFound'{} = (catch dmt_api_client:checkout_object({version, Version2}, Ref)).
fixture_domain_object(Ref, Data) ->
{category, #'CategoryObject'{
ref = #'CategoryRef'{id = Ref},
data = #'Category'{name = Data, description = Data}
}}.
fixture_object_ref(Ref) ->
{category, #'CategoryRef'{id = Ref}}.

View File

@ -1,71 +0,0 @@
-module(dmt_client_tests_SUITE).
-include_lib("common_test/include/ct.hrl").
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([application_stop/1]).
-export([poll/1]).
-include_lib("dmt_proto/include/dmt_domain_config_thrift.hrl").
%%
%% tests descriptions
%%
-spec all() -> [term()].
all() ->
[
{group, basic_lifecycle}
].
-spec groups() -> [term()].
groups() ->
[
{basic_lifecycle, [sequence], [
poll
]}
].
%%
%% starting/stopping
-spec init_per_suite(term()) -> term().
init_per_suite(C) ->
{ok, Apps} = application:ensure_all_started(dmt),
{ok, _PollerPid} = supervisor:start_child(dmt, #
{id => dmt_poller, start => {dmt_poller, start_link, []}, restart => permanent}
),
ok = dmt_mg:start(),
[{apps, Apps}|C].
-spec end_per_suite(term()) -> term().
end_per_suite(C) ->
[application_stop(App) || App <- proplists:get_value(apps, C)].
-spec application_stop(term()) -> term().
application_stop(App) ->
application:stop(App).
%%
%% tests
-spec poll(term()) -> term().
poll(_C) ->
Object = fixture_domain_object(1, <<"InsertFixture">>),
Ref = fixture_object_ref(1),
{'ObjectNotFound'} = (catch dmt_api_client:checkout_object({head, #'Head'{}}, Ref)),
#'Snapshot'{version = Version1} = dmt_api_client:checkout({head, #'Head'{}}),
Version2 = dmt_api_client:commit(Version1, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
#'Snapshot'{version = Version1} = dmt:checkout({head, #'Head'{}}),
object_not_found = (catch dmt:checkout_object({head, #'Head'{}}, Ref)),
ok = dmt_poller:poll(),
#'Snapshot'{version = Version2} = dmt:checkout({head, #'Head'{}}),
#'VersionedObject'{object = Object} = dmt:checkout_object({head, #'Head'{}}, Ref).
fixture_domain_object(Ref, Data) ->
{category, #'CategoryObject'{
ref = #'CategoryRef'{id = Ref},
data = #'Category'{name = Data, description = Data}
}}.
fixture_object_ref(Ref) ->
{category, #'CategoryRef'{id = Ref}}.

View File

@ -1,2 +0,0 @@
/include/dmt_*_thrift.hrl
/src/dmt_*_thrift.erl

@ -1 +0,0 @@
Subproject commit 7eda08c2a54c617d7fad2b14e6c185ba9f3fdc27

View File

@ -1,17 +0,0 @@
{plugins, [
{rebar3_thrift_compiler,
{git, "https://github.com/rbkmoney/rebar3_thrift_compiler.git", {tag, "0.2"}}}
]}.
{provider_hooks, [
{pre, [
{compile, {thrift, compile}},
{clean, {thrift, clean}}
]}
]}.
{thrift_compiler_opts, [
{in_dir, "damsel/proto"},
{in_files, ["domain_config.thrift", "state_processing.thrift"]},
{gen, "erlang:app_prefix=dmt"}
]}.

View File

@ -1,9 +0,0 @@
{application, dmt_proto, [
{description, "Domain config protocol definitions"},
{vsn, "0"},
{registered, []},
{applications, [
kernel,
stdlib
]}
]}.

1
build_utils Submodule

@ -0,0 +1 @@
Subproject commit cb3389abfe0c0a969fa0415b15bf32b0a9005b4e

View File

@ -1,4 +1,5 @@
[
{dmt, [
{dmt_api, [
{automaton_service_url, "http://machinegun:8022/v1/automaton"}
]}
].

1
damsel Submodule

@ -0,0 +1 @@
Subproject commit 78b7bac323b2a9e1ea0ff5faa38e617e9ea499a2

27
docker-compose.sh Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
cat <<EOF
version: '2'
services:
${SERVICE_NAME}:
image: ${BUILD_IMAGE}
volumes:
- .:$PWD
- $HOME/.cache:/home/$UNAME/.cache
working_dir: $PWD
command: /sbin/init
depends_on:
- machinegun
machinegun:
image: dr.rbkmoney.com/rbkmoney/machinegun:6a63173edec1aae1d4ee23f441307c598aea91f0
volumes:
- ./test/machinegun/sys.config:/opt/machinegun/releases/0.1.0/sys.config
networks:
default:
driver: bridge
driver_opts:
com.docker.network.enable_ipv6: "true"
com.docker.network.bridge.enable_ip_masquerade: "false"
EOF

View File

@ -1,21 +0,0 @@
version: '2'
services:
dominant:
image: rbkmoney/build:latest
volumes:
- .:/code
working_dir: /code
command: /sbin/init
links:
- machinegun
dmt_client:
image: rbkmoney/build:latest
volumes:
- .:/code
working_dir: /code
command: /sbin/init
links:
- dominant
machinegun:
image: rbkmoney/mg:dmt
command: /opt/mgun/bin/mgun foreground

View File

@ -2,7 +2,7 @@
{elvis, [
{config, [
#{
dirs => ["apps/*/src"],
dirs => ["src"],
filter => "*.erl",
ignore => ["_thrift.erl$"],
rules => [
@ -37,7 +37,7 @@
ruleset => elvis_config
},
#{
dirs => ["apps", "apps/*"],
dirs => ["."],
filter => "rebar.config",
rules => [
{elvis_style, line_length, #{limit => 120, skip_comments => false}},
@ -55,7 +55,7 @@
]
},
#{
dirs => ["apps/*/src"],
dirs => ["src"],
filter => "*.app.src",
rules => [
{elvis_style, line_length, #{limit => 120, skip_comments => false}},

View File

@ -27,9 +27,15 @@
%% Common project dependencies.
{deps, [
{genlib , {git, "https://github.com/rbkmoney/genlib.git", {branch, "master"}}},
{woody , {git, "git@github.com:rbkmoney/woody_erlang.git", {branch, "master"}}},
{lager , "3.0.2"}
{dmt , {git, "git@github.com:rbkmoney/dmt_core.git", {branch, "master"}}},
{lager , "3.0.2"},
% TODO move to the test profile as soon as compose quirks get fixed
{dmt_client, {git, "git@github.com:rbkmoney/dmt_client.git", {branch, "initial"}}}
]}.
%% XRef checks
@ -65,21 +71,47 @@
% mandatory
unmatched_returns,
error_handling,
race_conditions,
unknown
race_conditions
% unknown %% need fix
]},
{plt_apps, all_deps}
]}.
{profiles, [
{test, [
{deps, [
]}
]},
{prod, [
{relx, [
{dev_mode, false},
{include_erts, true}
]}
]}
]}.
{plugins, [
rebar3_run
rebar3_run,
{rebar3_thrift_compiler,
{git, "https://github.com/rbkmoney/rebar3_thrift_compiler.git", {tag, "0.2.1"}}}
]}.
{provider_hooks, [
{pre, [
{compile, {thrift, compile}},
{clean, {thrift, clean}}
]}
]}.
{thrift_compiler_opts, [
{in_dir, "damsel/proto"},
{in_files, ["state_processing.thrift"]},
{gen, "erlang:app_prefix=dmt_api"}
]}.
{erl_opts, [
{parse_transform, lager_transform}
]}.

View File

@ -1,6 +1,10 @@
[{<<"certifi">>,{pkg,<<"certifi">>,<<"0.4.0">>},2},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"1.0.4">>},1},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.2">>},2},
{<<"dmt">>,
{git,"git@github.com:rbkmoney/dmt_core.git",
{ref,"dbe59c4ae4ea2fec5f70f0853764ac01b8d4a49a"}},
0},
{<<"genlib">>,
{git,"https://github.com/rbkmoney/genlib.git",
{ref,"66db7fe296465a875b6894eb5ac944c90f82f913"}},
@ -13,15 +17,15 @@
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.2">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.2.1">>},2},
{<<"snowflake">>,
{git,"https://github.com/tel/snowflake.git",
{ref,"7a8eab0f12757133623b2151a7913b6d2707b629"}},
{git,"https://github.com/rbkmoney/snowflake.git",
{ref,"36b978a3ad711c9d9349b799a24c5499a95ae29a"}},
1},
{<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.0">>},2},
{<<"thrift">>,
{git,"https://github.com/rbkmoney/thrift_erlang.git",
{ref,"1978d5a1e350694bfa3d2dc3a762176bc60c6e64"}},
{ref,"f132805904307376831fc2dd3780148b4b91aae2"}},
1},
{<<"woody">>,
{git,"git@github.com:rbkmoney/woody_erlang.git",
{ref,"dbe03ca171087f2649a7057a379cdb10c5dce90c"}},
{ref,"49f47a8b3d0429269d389e2207b94879b9088380"}},
0}].

View File

@ -6,8 +6,8 @@
kernel,
stdlib,
lager,
dmt,
woody
woody,
dmt
]},
{mod, {dmt_api, []}},
{env, []}

169
src/dmt_api.erl Normal file
View File

@ -0,0 +1,169 @@
-module(dmt_api).
-behaviour(application).
-behaviour(supervisor).
%% API
-export([checkout/2]).
-export([checkout_object/3]).
-export([pull/2]).
-export([commit/3]).
-export([apply_commit/3]).
%% behaviours
-export([start/2]).
-export([stop/1]).
-export([init/1]).
-include_lib("dmt/include/dmt_domain_config_thrift.hrl").
%% API
-type context() :: woody_client:context().
-spec checkout(dmt:ref(), context()) ->
{dmt:snapshot() | {error, version_not_found}, context()}.
checkout(Reference, Context) ->
try
{dmt_cache:checkout(Reference), Context}
catch
version_not_found ->
dmt_api_context:map(
try_get_snapshot(Reference, Context),
fun
(Snapshot = #'Snapshot'{}) -> dmt_cache:cache_snapshot(Snapshot);
(Error = {error, _}) -> Error
end
)
end.
-spec try_get_snapshot(dmt:ref(), context()) ->
{dmt:snapshot() | {error, version_not_found}, context()}.
try_get_snapshot(Reference, Context) ->
{History, Context1} = dmt_api_mg:get_history(undefined, reference_to_limit(Reference), Context),
{case {Reference, dmt_history:head(History)} of
{{head, #'Head'{}}, Snapshot} ->
Snapshot;
{{version, V}, Snapshot = #'Snapshot'{version = V}} ->
Snapshot;
{{version, V1}, #'Snapshot'{version = V2}} when V1 > V2 ->
{error, version_not_found}
end, Context1}.
-spec reference_to_limit(dmt:ref()) -> pos_integer() | undefined.
reference_to_limit({head, #'Head'{}}) ->
undefined;
reference_to_limit({version, Version}) ->
Version.
-spec checkout_object(dmt:ref(), dmt:object_ref(), context()) ->
{dmt_domain_config_thrift:'VersionedObject'() | {error, version_not_found | object_not_found}, context()}.
checkout_object(Reference, ObjectReference, Context) ->
dmt_api_context:map(
checkout(Reference, Context),
fun
(Snapshot = #'Snapshot'{}) -> try_get_object(ObjectReference, Snapshot);
(Error = {error, _}) -> Error
end
).
try_get_object(ObjectReference, #'Snapshot'{version = Version, domain = Domain}) ->
case dmt_domain:get_object(ObjectReference, Domain) of
{ok, Object} ->
#'VersionedObject'{version = Version, object = Object};
error ->
{error, object_not_found}
end.
-spec pull(dmt:version(), context()) ->
{dmt:history() | {error, version_not_found}, context()}.
pull(Version, Context) ->
dmt_api_mg:get_history(Version, undefined, Context).
-spec commit(dmt:version(), dmt:commit(), context()) ->
{dmt:version() | {error, version_not_found | operation_conflict}, context()}.
commit(Version, Commit, Context) ->
dmt_api_context:map(
dmt_api_mg:commit(Version, Commit, Context),
fun
(Snapshot = #'Snapshot'{version = VersionNext}) ->
_ = dmt_cache:cache_snapshot(Snapshot),
VersionNext;
(Error = {error, _}) ->
Error
end
).
-spec apply_commit(dmt:version(), dmt:commit(), dmt:history()) ->
{ok, dmt:snapshot()} | {error, term()}.
apply_commit(VersionWas, #'Commit'{ops = Ops}, History) ->
SnapshotWas = dmt_history:head(History),
case SnapshotWas of
#'Snapshot'{version = VersionWas, domain = DomainWas} ->
try
Domain = dmt_domain:apply_operations(Ops, DomainWas),
{ok, #'Snapshot'{version = VersionWas + 1, domain = Domain}}
catch
Reason ->
{error, Reason}
end;
#'Snapshot'{version = Version} when Version > VersionWas ->
{error, {head_mismatch, Version}};
#'Snapshot'{} ->
{error, version_not_found}
end.
%% behaviours
-spec start(application:start_type(), term()) -> {ok, pid()} | {error, term()}.
start(_StartType, _Args) ->
{ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
{ok, _Context} = dmt_api_mg:start(dmt_api_context:new()),
{ok, Pid}.
-spec stop(term()) -> ok.
stop(_State) ->
ok.
%%
init([]) ->
API = woody_server:child_spec(
?MODULE,
#{
ip => dmt_api_utils:get_hostname_ip(genlib_app:env(?MODULE, host, "dominant")),
port => genlib_app:env(?MODULE, port, 8022),
net_opts => [],
event_handler => dmt_api_woody_event_handler,
handlers => [
get_handler_spec(repository),
get_handler_spec(repository_client),
get_handler_spec(state_processor)
]
}
),
Children = [API],
{ok, {#{strategy => one_for_one, intensity => 10, period => 60}, Children}}.
-spec get_handler_spec(Which) -> {Path, {woody_t:service(), module(), term()}} when
Which :: repository | repository_client | state_processor,
Path :: iodata().
get_handler_spec(repository) ->
{"/v1/domain/repository", {
{dmt_domain_config_thrift, 'Repository'},
dmt_api_repository_handler,
[]
}};
get_handler_spec(repository_client) ->
{"/v1/domain/repository_client", {
{dmt_domain_config_thrift, 'RepositoryClient'},
dmt_api_repository_client_handler,
[]
}};
get_handler_spec(state_processor) ->
{"/v1/stateproc", {
{dmt_api_state_processing_thrift, 'Processor'},
dmt_api_state_processor,
[]
}}.

20
src/dmt_api_context.erl Normal file
View File

@ -0,0 +1,20 @@
-module(dmt_api_context).
-type context() :: woody_client:context().
-export([new/0]).
-export([map/2]).
-spec new() -> woody_client:context().
new() ->
ReqID = woody_client:make_unique_int(),
woody_client:new_context(genlib:to_binary(ReqID), dmt_api_woody_event_handler).
-spec map({T1, context()}, fun((T1) -> T2)) ->
{T2, woody_client:context()} when
T1 :: term(),
T2 :: term().
map({T, Context}, Fun) ->
{Fun(T), Context}.

88
src/dmt_api_mg.erl Normal file
View File

@ -0,0 +1,88 @@
-module(dmt_api_mg).
-export([start/1]).
-export([get_commit/2]).
-export([get_history/1]).
-export([get_history/3]).
-export([commit/3]).
-export([read_history/1]).
-export([call/3]).
-include("dmt_api_state_processing_thrift.hrl").
-define(NS , <<"domain-config">>).
-define(ID , <<"primary">>).
-define(REF , {id, ?ID}).
-type context() :: woody_client:context().
%%
-spec start(context()) ->
{ok, context()} | no_return().
start(Context) ->
try call('Start', [?ID, <<>>], Context) catch
{{exception, #'MachineAlreadyExists'{}}, Context1} ->
{ok, Context1}
end.
-spec get_commit(dmt:version(), context()) ->
{dmt:commit() | {error, version_not_found}, context()} | no_return().
get_commit(ID, Context) ->
dmt_api_context:map(
get_history(get_prev_commit(ID), 1, Context),
fun
(#{ID := Commit}) -> Commit;
(#{}) -> {error, version_not_found};
(Error) -> Error
end
).
get_prev_commit(1) ->
undefined;
get_prev_commit(N) ->
N - 1.
%% TODO: add range requests after they are fixed in mg
-spec get_history(context()) ->
{dmt:history(), context()}.
get_history(Context) ->
get_history(undefined, undefined, Context).
-spec get_history(dmt:version() | undefined, pos_integer() | undefined, context()) ->
{dmt:history() | {error, version_not_found}, context()}.
get_history(After, Limit, Context) ->
Range = #'HistoryRange'{'after' = After, 'limit' = Limit},
try dmt_api_context:map(call('GetHistory', [?REF, Range], Context), fun read_history/1) catch
{{exception, #'EventNotFound'{}}, Context1} ->
{{error, version_not_found}, Context1}
end.
-spec commit(dmt:version(), dmt:commit(), context()) ->
{dmt:version() | {error, version_not_found | operation_conflict}, context()}.
commit(Version, Commit, Context) ->
Call = term_to_binary({commit, Version, Commit}),
dmt_api_context:map(call('Call', [?REF, Call], Context), fun binary_to_term/1).
%%
-spec call(atom(), list(term()), context()) ->
{ok, context()} | {{ok, term()}, context()} | no_return().
call(Method, Args, Context) ->
Request = {{dmt_api_state_processing_thrift, 'Automaton'}, Method, [?NS | Args]},
{ok, URL} = application:get_env(dmt_api, automaton_service_url),
woody_client:call(Context, Request, #{url => URL}).
%% utils
-spec read_history([dmt_api_state_processing_thrift:'Event'()]) -> dmt:history().
read_history(Events) ->
read_history(Events, #{}).
-spec read_history([dmt_api_state_processing_thrift:'Event'()], dmt:history()) ->
dmt:history().
read_history([], History) ->
History;
read_history([#'Event'{id = Id, event_payload = BinaryPayload} | Rest], History) ->
read_history(Rest, History#{Id => binary_to_term(BinaryPayload)}).

View File

@ -0,0 +1,24 @@
-module(dmt_api_repository_client_handler).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include_lib("dmt/include/dmt_domain_config_thrift.hrl").
-spec handle_function(
woody_t:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {woody_server_thrift_handler:result(), woody_client:context()} | no_return().
handle_function('checkoutObject', {Reference, ObjectReference}, Context, _Opts) ->
case dmt_api:checkout_object(Reference, ObjectReference, Context) of
{Object = #'VersionedObject'{}, Context1} ->
{Object, Context1};
{{error, object_not_found}, Context1} ->
throw({#'ObjectNotFound'{}, Context1});
{{error, version_not_found}, Context1} ->
throw({#'VersionNotFound'{}, Context1})
end.

View File

@ -0,0 +1,38 @@
-module(dmt_api_repository_handler).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include_lib("dmt/include/dmt_domain_config_thrift.hrl").
-spec handle_function(
woody_t:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {woody_server_thrift_handler:result(), woody_client:context()} | no_return().
handle_function('Commit', {Version, Commit}, Context, _Opts) ->
case dmt_api:commit(Version, Commit, Context) of
{VersionNext, Context1} when is_integer(VersionNext) ->
{VersionNext, Context1};
{{error, operation_conflict}, Context1} ->
throw({#'OperationConflict'{}, Context1});
{{error, version_not_found}, Context1} ->
throw({#'VersionNotFound'{}, Context1})
end;
handle_function('Checkout', {Reference}, Context, _Opts) ->
case dmt_api:checkout(Reference, Context) of
{Snapshot = #'Snapshot'{}, Context1} ->
{Snapshot, Context1};
{{error, version_not_found}, Context1} ->
throw({#'VersionNotFound'{}, Context1})
end;
handle_function('Pull', {Version}, Context, _Opts) ->
case dmt_api:pull(Version, Context) of
{History = #{}, Context1} ->
{History, Context1};
{{error, version_not_found}, Context1} ->
throw({#'VersionNotFound'{}, Context1})
end.

View File

@ -0,0 +1,41 @@
-module(dmt_api_state_processor).
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
%%
-include("dmt_api_state_processing_thrift.hrl").
-spec handle_function(
woody_t:func(),
woody_server_thrift_handler:args(),
woody_client:context(),
woody_server_thrift_handler:handler_opts()
) -> {woody_server_thrift_handler:result(), woody_client:context()} | no_return().
handle_function('ProcessCall', {#'CallArgs'{arg = Payload, history = History}}, Context, _Opts) ->
{Response, Events} = handle_call(binary_to_term(Payload), dmt_api_mg:read_history(History)),
{
#'CallResult'{
events = lists:map(fun term_to_binary/1, Events),
action = #'ComplexAction'{},
response = term_to_binary(Response)
},
Context
};
handle_function('ProcessSignal', {#'SignalArgs'{signal = {init, #'InitSignal'{}}}}, Context, _Opts) ->
{#'SignalResult'{events = [], action = #'ComplexAction'{}}, Context}.
%%
handle_call({commit, Version, Commit}, History) ->
case dmt_api:apply_commit(Version, Commit, History) of
{ok, Snapshot} ->
{Snapshot, [Commit]};
{error, version_not_found} ->
{{error, version_not_found}, []};
{error, Reason} ->
_ = lager:info("commit failed: ~p", [Reason]),
{{error, operation_conflict}, []}
end.

View File

@ -0,0 +1,114 @@
-module(dmt_api_tests_SUITE).
-include_lib("common_test/include/ct.hrl").
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([insert/1]).
-export([update/1]).
-export([delete/1]).
-include_lib("dmt/include/dmt_domain_config_thrift.hrl").
%% tests descriptions
-type config() :: [{atom(), term()}].
-define(config(Key, C), (element(2, lists:keyfind(Key, 1, C)))).
-type test_case_name() :: atom().
-type group_name() :: atom().
-spec all() -> [{group, group_name()}].
all() ->
[
{group, basic_lifecycle}
].
-spec groups() -> [{group_name(), list(), [test_case_name()]}].
groups() ->
[
{basic_lifecycle, [sequence, {repeat, 10}, shuffle], [
insert,
update,
delete
]}
].
%%
%% starting/stopping
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
Apps =
genlib_app:start_application_with(lager, [
{async_threshold, 1},
{async_threshold_window, 0},
{error_logger_hwm, 600},
{suppress_application_start_stop, true},
{handlers, [
{lager_common_test_backend, [debug, false]}
]}
]) ++
genlib_app:start_application_with(dmt_api, [
{automaton_service_url, "http://machinegun:8022/v1/automaton"}
]),
[{suite_apps, Apps}, {counter, 1} | C].
-spec end_per_suite(config()) -> term().
end_per_suite(C) ->
[application:stop(App) || App <- lists:reverse(?config(suite_apps, C))].
%%
%% tests
-spec insert(term()) -> term().
insert(_C) ->
ID = next_id(),
Object = fixture_domain_object(ID, <<"InsertFixture">>),
Ref = fixture_object_ref(ID),
#'ObjectNotFound'{} = (catch dmt_client_api:checkout_object({head, #'Head'{}}, Ref)),
#'Snapshot'{version = Version1} = dmt_client_api:checkout({head, #'Head'{}}),
Version2 = dmt_client_api:commit(Version1, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
#'VersionedObject'{object = Object} = dmt_client_api:checkout_object({head, #'Head'{}}, Ref),
#'ObjectNotFound'{} = (catch dmt_client_api:checkout_object({version, Version1}, Ref)),
#'VersionedObject'{object = Object} = dmt_client_api:checkout_object({version, Version2}, Ref).
-spec update(term()) -> term().
update(_C) ->
ID = next_id(),
Object1 = fixture_domain_object(ID, <<"UpdateFixture1">>),
Object2 = fixture_domain_object(ID, <<"UpdateFixture2">>),
Ref = fixture_object_ref(ID),
#'Snapshot'{version = Version0} = dmt_client_api:checkout({head, #'Head'{}}),
Version1 = dmt_client_api:commit(Version0, #'Commit'{ops = [{insert, #'InsertOp'{object = Object1}}]}),
Version2 = dmt_client_api:commit(
Version1,
#'Commit'{ops = [{update, #'UpdateOp'{old_object = Object1, new_object = Object2}}]}
),
#'VersionedObject'{object = Object1} = dmt_client_api:checkout_object({version, Version1}, Ref),
#'VersionedObject'{object = Object2} = dmt_client_api:checkout_object({version, Version2}, Ref).
-spec delete(term()) -> term().
delete(_C) ->
ID = next_id(),
Object = fixture_domain_object(ID, <<"DeleteFixture">>),
Ref = fixture_object_ref(ID),
#'Snapshot'{version = Version0} = dmt_client_api:checkout({head, #'Head'{}}),
Version1 = dmt_client_api:commit(Version0, #'Commit'{ops = [{insert, #'InsertOp'{object = Object}}]}),
Version2 = dmt_client_api:commit(Version1, #'Commit'{ops = [{remove, #'RemoveOp'{object = Object}}]}),
#'VersionedObject'{object = Object} = dmt_client_api:checkout_object({version, Version1}, Ref),
#'ObjectNotFound'{} = (catch dmt_client_api:checkout_object({version, Version2}, Ref)).
next_id() ->
erlang:system_time(micro_seconds) band 16#7FFFFFFF.
fixture_domain_object(Ref, Data) ->
{category, #'CategoryObject'{
ref = #'CategoryRef'{id = Ref},
data = #'Category'{name = Data, description = Data}
}}.
fixture_object_ref(Ref) ->
{category, #'CategoryRef'{id = Ref}}.

View File

@ -0,0 +1,7 @@
[
{mg_woody_api, [
{nss, [
{<<"domain-config">>, <<"http://dominant:8022/v1/stateproc">>}
]}
]}
].

View File

@ -1,37 +0,0 @@
SHELL := /bin/bash
which = $(if $(shell which $(1) 2>/dev/null),\
$(shell which $(1) 2>/dev/null),\
$(error "Error: could not locate $(1)!"))
DOCKER = $(call which,docker)
DOCKER_COMPOSE = $(call which,docker-compose)
UTIL_TARGETS := to_dev_container w_container_% run_w_container_% check_w_container_%
ifndef RELNAME
$(error RELNAME is not set)
endif
ifndef CALL_W_CONTAINER
$(error CALL_W_CONTAINER is not set)
endif
to_dev_container:
$(DOCKER) run -it --rm -v $$PWD:$$PWD --workdir $$PWD $(BASE_IMAGE) /bin/bash
w_container_%:
$(MAKE) -s run_w_container_$*
run_w_container_%: check_w_container_%
{ \
$(DOCKER_COMPOSE) up -d ; \
$(DOCKER_COMPOSE) exec -T $(RELNAME) make $* ; \
res=$$? ; \
$(DOCKER_COMPOSE) down ; \
exit $$res ; \
}
check_w_container_%:
$(if $(filter $*,$(CALL_W_CONTAINER)),,\
$(error "Error: target '$*' cannot be called w_container_"))