* Refactor complex case clauses into function clauses
* Use `dmt_api_repository_v5` as default
* Drop unnecessary config hack
* Update sys.config
* Fix outdated sys.config pieces
This commit is contained in:
Andrew Mayorov 2022-08-25 11:12:47 +03:00 committed by GitHub
parent 8f315bc55f
commit 4e01fe45ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 232 additions and 340 deletions

View File

@ -1,6 +1,5 @@
[
{kernel, [
{logger_sasl_compatible, false},
{logger_level, info},
{logger, [
{handler, default, logger_std_h, #{
@ -16,20 +15,13 @@
{dmt_api, [
{repository, dmt_api_repository_v5},
{migration, #{
timeout => 360,
limit => 20,
timeout => 360,
limit => 20,
read_only_gap => 1000
}},
{ip, "::"},
{port, 8022},
{default_woody_handling_timeout, 30000},
{scoper_event_handler_options, #{
event_handler_opts => #{
formatter_opts => #{
max_length => 1000
}
}
}},
{woody_event_handlers, [
{scoper_woody_event_handler, #{
event_handler_opts => #{
@ -49,12 +41,13 @@
% Should be greater than any other timeouts
idle_timeout => infinity
}},
{max_cache_size, 52428800}, % 50Mb
{health_checkers, [
{erl_health, disk , ["/", 99] },
{erl_health, cg_memory, [99] },
{erl_health, service , [<<"dominant">>]}
]},
% 50Mb
{max_cache_size, 52428800},
{health_check, #{
disk => {erl_health, disk, ["/", 99]},
memory => {erl_health, cg_memory, [99]},
service => {erl_health, service, [<<"dominant">>]}
}},
{services, #{
automaton => #{
url => "http://machinegun:8022/v1/automaton",

View File

@ -5,7 +5,6 @@
warnings_as_errors,
warn_export_all,
warn_missing_spec,
%% warn_untyped_record, FIXME ASAP!!!! Problem in thrift_protocol.hrl#L23
warn_export_vars,
% by default
@ -29,8 +28,8 @@
{genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}},
{woody, {git, "https://github.com/valitydev/woody_erlang.git", {branch, "master"}}},
{damsel, {git, "https://github.com/valitydev/damsel.git", {branch, "master"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}},
{dmt_core, {git, "https://github.com/valitydev/dmt-core.git", {branch, "master"}}},
{machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {branch, "master"}}},
{scoper, {git, "https://github.com/valitydev/scoper.git", {branch, "master"}}},
{erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, master}}}
]}.

View File

@ -36,11 +36,15 @@
0},
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2},
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1},
{<<"machinery">>,
{git,"https://github.com/valitydev/machinery-erlang.git",
{ref,"4c0382f6cb14ebea361909c2a0e06efe0fc0d505"}},
0},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mg_proto">>,
{git,"https://github.com/valitydev/machinegun-proto.git",
{ref,"a411c7d5d779389c70d2594eb4a28a916dce1721"}},
0},
1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2},

View File

@ -9,7 +9,7 @@
woody,
damsel,
dmt_core,
mg_proto,
machinery,
erl_health
]},
{mod, {dmt_api, []}},

View File

@ -44,38 +44,43 @@ init(_) ->
{ok, {#{strategy => one_for_one, intensity => 10, period => 60}, Children}}.
get_repository_handlers() ->
Repository = genlib_app:env(?MODULE, repository, dmt_api_repository_v4),
Repository = genlib_app:env(?MODULE, repository, dmt_api_repository_v5),
DefaultTimeout = genlib_app:env(?MODULE, default_woody_handling_timeout, timer:seconds(30)),
[
get_handler_spec(repository, #{
get_handler(repository, #{
repository => Repository,
default_handling_timeout => DefaultTimeout
}),
get_handler_spec(repository_client, #{
get_handler(repository_client, #{
repository => Repository,
default_handling_timeout => DefaultTimeout
}),
get_handler_spec(state_processor, Repository)
get_machinery_handler(Repository)
].
-spec get_handler_spec(repository | repository_client | state_processor, woody:options()) ->
{Path :: iodata(), {woody:service(), woody:handler(woody:options())}}.
get_handler_spec(repository, Options) ->
-spec get_handler(repository | repository_client | state_processor, woody:options()) ->
woody:http_handler(woody:th_handler()).
get_handler(repository, Options) ->
{"/v1/domain/repository", {
{dmsl_domain_conf_thrift, 'Repository'},
{dmt_api_repository_handler, Options}
}};
get_handler_spec(repository_client, Options) ->
get_handler(repository_client, Options) ->
{"/v1/domain/repository_client", {
{dmsl_domain_conf_thrift, 'RepositoryClient'},
{dmt_api_repository_client_handler, Options}
}};
get_handler_spec(state_processor, Options) ->
{"/v1/stateproc", {
{mg_proto_state_processing_thrift, 'Processor'},
{dmt_api_automaton_handler, Options}
}}.
-spec get_machinery_handler(module()) ->
woody:http_handler(woody:th_handler()).
get_machinery_handler(Repository) ->
machinery_mg_backend:get_handler(
{Repository, #{
path => "/v1/stateproc",
backend_config => #{schema => Repository}
}}
).
-spec enable_health_logging(erl_health:check()) -> erl_health:check().
enable_health_logging(Check) ->
EvHandler = {erl_health_event_handler, []},

View File

@ -1,87 +0,0 @@
-module(dmt_api_automaton_client).
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-export([call/5]).
-export([get_history/4]).
-export([start/3]).
%%
-type ns() :: mg_proto_base_thrift:'Namespace'().
-type id() :: mg_proto_base_thrift:'ID'().
-type args() :: mg_proto_state_processing_thrift:'Args'().
-type response() :: mg_proto_state_processing_thrift:'CallResponse'().
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type context() :: woody_context:ctx().
%%
-spec call(ns(), id(), history_range(), args(), context()) ->
response()
| no_return().
call(NS, ID, HistoryRange, Args, Context) ->
Descriptor = construct_descriptor(NS, ID, HistoryRange),
case issue_rpc('Call', {Descriptor, Args}, Context) of
{ok, Result} ->
Result;
{error, #'mg_stateproc_MachineNotFound'{}} ->
ok = start(NS, ID, Context),
call(NS, ID, HistoryRange, Args, Context)
end.
-spec get_history(ns(), id(), history_range(), context()) ->
{ok, history()}
| {error,
mg_proto_state_processing_thrift:'EventNotFound'()
| mg_proto_state_processing_thrift:'MachineNotFound'()}
| no_return().
get_history(NS, ID, HistoryRange, Context) ->
Descriptor = construct_descriptor(NS, ID, HistoryRange),
case issue_rpc('GetMachine', {Descriptor}, Context) of
{ok, #'mg_stateproc_Machine'{history = History}} ->
{ok, History};
{error, _} = Error ->
Error
end.
-spec start(ns(), id(), context()) -> ok | no_return().
start(NS, ID, Context) ->
case issue_rpc('Start', {NS, ID, {nl, #mg_msgpack_Nil{}}}, Context) of
{ok, _} ->
ok;
{error, #'mg_stateproc_MachineAlreadyExists'{}} ->
ok
end.
-spec construct_descriptor(ns(), id(), history_range()) -> descriptor().
construct_descriptor(NS, ID, HistoryRange) ->
#'mg_stateproc_MachineDescriptor'{
ns = NS,
ref = {id, ID},
range = HistoryRange
}.
-spec issue_rpc(woody:func(), woody:args(), context()) -> term() | no_return().
issue_rpc(Method, Args, Context) ->
Request = {{mg_proto_state_processing_thrift, 'Automaton'}, Method, Args},
Opts = make_woody_options(automaton),
case woody_client:call(Request, Opts, Context) of
{ok, _} = Ok ->
Ok;
{exception, #'mg_stateproc_NamespaceNotFound'{}} ->
error(namespace_not_found);
{exception, #'mg_stateproc_MachineFailed'{}} ->
error(machine_failed);
{exception, Exception} ->
{error, Exception}
end.
-spec make_woody_options(atom()) -> woody_client:options().
make_woody_options(Service) ->
Services = application:get_env(dmt_api, services, #{}),
#{Service := ServiceOptions} = Services,
EventHandlerOpts = genlib_app:env(dmt_api, scoper_event_handler_options, #{}),
ServiceOptions#{event_handler => {scoper_woody_event_handler, EventHandlerOpts}}.

View File

@ -1,67 +0,0 @@
-module(dmt_api_automaton_handler).
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-callback process_call(call(), machine(), context()) ->
{response(), events()} | {response(), aux_state(), events()} | no_return().
-callback process_signal(signal(), machine(), context()) ->
{action(), events()} | {action(), aux_state(), events()} | no_return().
-export_type([call/0]).
-export_type([signal/0]).
-export_type([machine/0]).
-export_type([response/0]).
-export_type([action/0]).
-export_type([aux_state/0]).
-export_type([events/0]).
-type call() :: mg_proto_state_processing_thrift:'Args'().
-type signal() :: mg_proto_state_processing_thrift:'Signal'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type response() :: mg_proto_state_processing_thrift:'CallResponse'().
-type action() :: mg_proto_state_processing_thrift:'ComplexAction'().
-type aux_state() :: mg_proto_state_processing_thrift:'AuxState'().
-type events() :: mg_proto_state_processing_thrift:'EventBodies'().
-type context() :: woody_context:ctx().
%% State processor
-behaviour(woody_server_thrift_handler).
-export([handle_function/4]).
-define(NIL, #mg_stateproc_Content{data = {nl, #mg_msgpack_Nil{}}}).
-spec handle_function(woody:func(), woody:args(), context(), woody:options()) -> {ok, woody:result()} | no_return().
handle_function('ProcessCall', {#mg_stateproc_CallArgs{arg = Payload, machine = Machine}}, Context, Handler) ->
Result = Handler:process_call(Payload, Machine, Context),
{ok, construct_call_result(Result)};
handle_function('ProcessSignal', {#mg_stateproc_SignalArgs{signal = Signal, machine = Machine}}, Context, Handler) ->
Result = Handler:process_signal(Signal, Machine, Context),
{ok, construct_signal_result(Result)}.
%% Internals
construct_call_result({Response, Events}) ->
construct_call_result(Response, ?NIL, Events);
construct_call_result({Response, AuxState, Events}) ->
construct_call_result(Response, AuxState, Events).
construct_call_result(Response, AuxState, Events) ->
#mg_stateproc_CallResult{
response = Response,
change = #mg_stateproc_MachineStateChange{aux_state = AuxState, events = Events},
action = #mg_stateproc_ComplexAction{}
}.
construct_signal_result({Action, Events}) ->
construct_signal_result(Action, ?NIL, Events);
construct_signal_result({Action, AuxState, Events}) ->
construct_signal_result(Action, AuxState, Events).
construct_signal_result(Action, AuxState, Events) ->
#mg_stateproc_SignalResult{
change = #mg_stateproc_MachineStateChange{aux_state = AuxState, events = Events},
action = Action
}.

View File

@ -69,6 +69,7 @@ do_handle_function('Pull', {Version}, Context, Options) ->
end.
%%
handle_operation_error({conflict, Conflict}) ->
#domain_conf_OperationConflict{
conflict = handle_operation_conflict(Conflict)
@ -78,32 +79,26 @@ handle_operation_error({invalid, Invalid}) ->
errors = handle_operation_invalid(Invalid)
}.
handle_operation_conflict(Conflict) ->
case Conflict of
{object_already_exists, Ref} ->
{object_already_exists, #domain_conf_ObjectAlreadyExistsConflict{object_ref = Ref}};
{object_not_found, Ref} ->
{object_not_found, #domain_conf_ObjectNotFoundConflict{object_ref = Ref}};
{object_reference_mismatch, Ref} ->
{object_reference_mismatch, #domain_conf_ObjectReferenceMismatchConflict{object_ref = Ref}}
end.
handle_operation_conflict({object_already_exists, Ref}) ->
{object_already_exists, #domain_conf_ObjectAlreadyExistsConflict{object_ref = Ref}};
handle_operation_conflict({object_not_found, Ref}) ->
{object_not_found, #domain_conf_ObjectNotFoundConflict{object_ref = Ref}};
handle_operation_conflict({object_reference_mismatch, Ref}) ->
{object_reference_mismatch, #domain_conf_ObjectReferenceMismatchConflict{object_ref = Ref}}.
handle_operation_invalid(Invalid) ->
case Invalid of
{objects_not_exist, Refs} ->
[
{object_not_exists, #domain_conf_NonexistantObject{
object_ref = Ref,
referenced_by = ReferencedBy
}}
|| {Ref, ReferencedBy} <- Refs
];
{object_reference_cycles, Cycles} ->
[
{object_reference_cycle, #domain_conf_ObjectReferenceCycle{cycle = Cycle}}
|| Cycle <- Cycles
]
end.
handle_operation_invalid({objects_not_exist, Refs}) ->
[
{object_not_exists, #domain_conf_NonexistantObject{
object_ref = Ref,
referenced_by = ReferencedBy
}}
|| {Ref, ReferencedBy} <- Refs
];
handle_operation_invalid({object_reference_cycles, Cycles}) ->
[
{object_reference_cycle, #domain_conf_ObjectReferenceCycle{cycle = Cycle}}
|| Cycle <- Cycles
].
-spec repository(options()) -> module().
repository(#{repository := Repository}) ->

View File

@ -3,9 +3,8 @@
-behaviour(dmt_api_repository).
-include_lib("damsel/include/dmsl_domain_conf_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(NS, <<"domain-config">>).
-define(NS, 'domain-config').
-define(ID, <<"primary/v5">>).
-define(BASE, 10).
@ -18,10 +17,29 @@
%% State processor
-behaviour(dmt_api_automaton_handler).
-type args(T) :: machinery:args(T).
-type machine() :: machinery:machine(event(), _).
-type handler_args() :: machinery:handler_args(_).
-type handler_opts() :: machinery:handler_opts(_).
-type result() :: machinery:result(event(), none()).
-type response(T) :: machinery:response(T).
-export([process_call/3]).
-export([process_signal/3]).
% TODO
% Otherwise dialyzer spews pretty unreasonable complains. Lost any hope telling
% him that callback typespecs are fine.
% -behaviour(machinery).
-export([init/4]).
-export([process_call/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
%% Storage schema
-behaviour(machinery_mg_schema).
-export([marshal/3]).
-export([unmarshal/3]).
-export([get_version/1]).
%%
-record(st, {
@ -30,123 +48,121 @@
}).
-type st() :: #st{}.
-type context() :: woody_context:ctx().
-type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type history() :: mg_proto_state_processing_thrift:'History'().
-type event() :: {commit, commit(), #{snapshot => snapshot()}}.
-type ref() :: dmsl_domain_conf_thrift:'Reference'().
-type snapshot() :: dmt_api_repository:snapshot().
-type commit() :: dmt_api_repository:commit().
-type version() :: dmt_api_repository:version().
-spec checkout(ref(), context()) ->
-spec checkout(ref(), woody_context:ctx()) ->
{ok, snapshot()}
| {error, version_not_found}.
checkout({head, #domain_conf_Head{}}, Context) ->
HistoryRange = #mg_stateproc_HistoryRange{
'after' = undefined,
'limit' = ?BASE,
'direction' = backward
},
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
squash_state(St);
{error, version_not_found} ->
{error, version_not_found}
end;
checkout({version, V}, Context) ->
checkout({head, #domain_conf_Head{}}, WoodyCtx) ->
St = get_history_by_range({undefined, ?BASE, backward}, WoodyCtx),
squash_state(St);
checkout({version, V}, WoodyCtx) ->
BaseV = get_base_version(V),
HistoryRange = #mg_stateproc_HistoryRange{
'after' = get_event_id(BaseV),
'limit' = V - BaseV,
'direction' = forward
},
case get_history_by_range(HistoryRange, Context) of
#st{} = St ->
case squash_state(St) of
{ok, #domain_conf_Snapshot{version = V}} = Result ->
Result;
{ok, _} ->
{error, version_not_found}
end;
{error, version_not_found} ->
St = get_history_by_range({get_event_id(BaseV), V - BaseV, forward}, WoodyCtx),
case squash_state(St) of
{ok, #domain_conf_Snapshot{version = V}} = Result ->
Result;
{ok, _} ->
{error, version_not_found}
end.
-spec pull(dmt_api_repository:version(), context()) ->
-spec pull(version(), woody_context:ctx()) ->
{ok, dmt_api_repository:history()}
| {error, version_not_found}.
pull(Version, Context) ->
pull(Version, undefined, Context).
pull(Version, WoodyCtx) ->
pull(Version, undefined, WoodyCtx).
-spec pull(dmt_api_repository:version(), dmt_api_repository:limit(), context()) ->
-spec pull(version(), dmt_api_repository:limit(), woody_context:ctx()) ->
{ok, dmt_api_repository:history()}
| {error, version_not_found}.
pull(Version, Limit, Context) ->
pull(Version, Limit, WoodyCtx) ->
After = get_event_id(Version),
case get_history_by_range(#mg_stateproc_HistoryRange{'after' = After, 'limit' = Limit}, Context) of
#st{history = History} ->
{ok, History};
{error, version_not_found} ->
{error, version_not_found}
end.
St = get_history_by_range({After, Limit, forward}, WoodyCtx),
{ok, St#st.history}.
-spec commit(dmt_api_repository:version(), commit(), context()) ->
-spec commit(version(), commit(), woody_context:ctx()) ->
{ok, snapshot()}
| {error, version_not_found | {operation_error, dmt_domain:operation_error()}}.
commit(Version, Commit, Context) ->
commit(Version, Commit, WoodyCtx) ->
BaseID = get_event_id(get_base_version(Version)),
decode_call_result(
dmt_api_automaton_client:call(
?NS,
?ID,
%% TODO in theory, it's enought ?BASE + 1 events here,
%% but it's complicated and needs to be covered by tests
#mg_stateproc_HistoryRange{'after' = BaseID},
encode_call({commit, Version, Commit}),
Context
)
).
%%
-spec get_history_by_range(history_range(), context()) -> st() | {error, version_not_found}.
get_history_by_range(HistoryRange, Context) ->
case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of
{ok, History} ->
read_history(History);
{error, #mg_stateproc_MachineNotFound{}} ->
ok = dmt_api_automaton_client:start(?NS, ?ID, Context),
get_history_by_range(HistoryRange, Context);
{error, #mg_stateproc_EventNotFound{}} ->
{error, version_not_found}
%% TODO in theory, it's enought ?BASE + 1 events here,
%% but it's complicated and needs to be covered by tests
HistoryRange = {BaseID, undefined, forward},
Backend = get_backend(WoodyCtx),
case machinery:call(?NS, ?ID, HistoryRange, {commit, Version, Commit}, Backend) of
{ok, Response} ->
Response;
{error, notfound} ->
ok = machinery:start(?NS, ?ID, undefined, Backend),
commit(Version, Commit, WoodyCtx)
end.
%%
-spec process_call(dmt_api_automaton_handler:call(), machine(), context()) ->
{dmt_api_automaton_handler:response(), dmt_api_automaton_handler:events()} | no_return().
process_call(Call, #mg_stateproc_Machine{ns = ?NS, id = ?ID} = Machine, Context) ->
Args = decode_call(Call),
{Result, Events} = handle_call(Args, read_history(Machine), Context),
{encode_call_result(Result), encode_events(Events)};
process_call(_Call, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) ->
Message = <<"Unknown machine '", NS/binary, "' '", ID/binary, "'">>,
woody_error:raise(system, {internal, resource_unavailable, Message}).
-spec get_history_by_range(machinery:range(), woody_context:ctx()) -> st().
get_history_by_range(HistoryRange, WoodyCtx) ->
Backend = get_backend(WoodyCtx),
case machinery:get(?NS, ?ID, HistoryRange, Backend) of
{ok, Machine} ->
read_history(Machine);
{error, notfound} ->
ok = machinery:start(?NS, ?ID, undefined, Backend),
get_history_by_range(HistoryRange, WoodyCtx)
end.
-spec process_signal(dmt_api_automaton_handler:signal(), machine(), context()) ->
{dmt_api_automaton_handler:action(), dmt_api_automaton_handler:events()} | no_return().
process_signal({init, #mg_stateproc_InitSignal{}}, #mg_stateproc_Machine{ns = ?NS, id = ?ID}, _Context) ->
{#mg_stateproc_ComplexAction{}, []};
process_signal({timeout, #mg_stateproc_TimeoutSignal{}}, #mg_stateproc_Machine{ns = ?NS, id = ?ID}, _Context) ->
{#mg_stateproc_ComplexAction{}, []};
process_signal(_Signal, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) ->
Message = <<"Unknown machine '", NS/binary, "' '", ID/binary, "'">>,
-spec get_backend(woody_context:ctx()) -> machinery_mg_backend:backend().
get_backend(WoodyCtx) ->
machinery_mg_backend:new(WoodyCtx, #{
client => dmt_api_woody_utils:get_woody_client(automaton),
schema => ?MODULE
}).
%%
-spec init(args(_), machine(), handler_args(), handler_opts()) -> result().
init(_Args, _Machine, _HandlerArgs, _HandlerOpts) ->
#{}.
-spec process_call(args(Call), machine(), handler_args(), handler_opts()) ->
{response(Response), result()}
when
Call :: {commit, version(), commit()},
Response ::
{ok, snapshot()}
| {error, version_not_found | head_mismatch | {operation_error, _Reason}}.
process_call(Args, #{namespace := ?NS, id := ?ID} = Machine, _HandlerArgs, _HandlerOpts) ->
handle_call(Args, read_history(Machine));
process_call(_Call, Machine, _HandlerArgs, _HandlerOpts) ->
process_unexpected_machine(Machine).
-spec process_timeout(machine(), handler_args(), handler_opts()) ->
result().
process_timeout(#{namespace := ?NS, id := ?ID}, _HandlerArgs, _HandlerOpts) ->
#{};
process_timeout(Machine, _HandlerArgs, _HandlerOpts) ->
process_unexpected_machine(Machine).
-spec process_repair(args(_), machine(), handler_args(), handler_opts()) ->
{error, noimpl}.
process_repair(_Args, #{namespace := ?NS, id := ?ID}, _HandlerArgs, _HandlerOpts) ->
{error, noimpl};
process_repair(_Args, Machine, _HandlerArgs, _HandlerOpts) ->
process_unexpected_machine(Machine).
-spec process_unexpected_machine(machine()) ->
no_return().
process_unexpected_machine(#{namespace := NS, id := ID}) ->
Message = genlib:format("Unexpected machine: ns = ~s, id = ~s", [NS, ID]),
woody_error:raise(system, {internal, resource_unavailable, Message}).
%%
handle_call({commit, Version, Commit}, St, _Context) ->
handle_call({commit, Version, Commit}, St) ->
case squash_state(St) of
{ok, #domain_conf_Snapshot{version = Version} = Snapshot} ->
apply_commit(Snapshot, Commit);
@ -154,7 +170,7 @@ handle_call({commit, Version, Commit}, St, _Context) ->
% Is this retry? Maybe we already applied this commit.
check_commit(Version, Commit, St);
{ok, _} ->
{{error, version_not_found}, []}
{{error, version_not_found}, #{}}
end.
apply_commit(
@ -164,45 +180,33 @@ apply_commit(
case dmt_domain:apply_operations(Ops, DomainWas) of
{ok, Domain} ->
Snapshot = #domain_conf_Snapshot{version = VersionWas + 1, domain = Domain},
{{ok, Snapshot}, [make_event(Snapshot, Commit)]};
Event = make_event(Snapshot, Commit),
{{ok, Snapshot}, #{events => [Event]}};
{error, Reason} ->
{{error, {operation_error, Reason}}, []}
{{error, {operation_error, Reason}}, #{}}
end.
check_commit(Version, Commit, #st{snapshot = BaseSnapshot, history = History}) ->
case maps:get(Version + 1, History) of
Commit ->
% it's ok, commit alredy applied, lets return this snapshot
{dmt_history:travel(Version + 1, History, BaseSnapshot), []};
{dmt_history:travel(Version + 1, History, BaseSnapshot), #{}};
_ ->
{{error, head_mismatch}, []}
{{error, head_mismatch}, #{}}
end.
-spec read_history(machine() | history()) -> st().
read_history(#mg_stateproc_Machine{history = Events}) ->
read_history(Events);
read_history(Events) ->
read_history(Events, #st{}).
-spec read_history(machine()) -> st().
read_history(#{history := Events}) ->
lists:foldl(fun apply_event/2, #st{}, Events).
-spec read_history([mg_proto_state_processing_thrift:'Event'()], st()) -> st().
read_history([], St) ->
St;
read_history(
[#mg_stateproc_Event{id = Id, data = EventData, format_version = FmtVsn} | Rest],
#st{history = History} = St
) ->
{commit, Commit, Meta} = decode_event(FmtVsn, EventData),
-spec apply_event(machinery:event(event()), st()) -> st().
apply_event({ID, _CreatedAt, {commit, Commit, Meta}}, #st{history = History} = St) ->
StNext = St#st{history = History#{ID => Commit}},
case Meta of
#{snapshot := Snapshot} ->
read_history(
Rest,
St#st{snapshot = Snapshot, history = History#{Id => Commit}}
);
StNext#st{snapshot = Snapshot};
#{} ->
read_history(
Rest,
St#st{history = History#{Id => Commit}}
)
StNext
end.
squash_state(#st{snapshot = BaseSnapshot, history = History}) ->
@ -213,8 +217,7 @@ squash_state(#st{snapshot = BaseSnapshot, history = History}) ->
error(Error)
end.
%%
-spec make_event(snapshot(), commit()) -> event().
make_event(Snapshot, Commit) ->
Meta =
case (Snapshot#domain_conf_Snapshot.version) rem ?BASE of
@ -225,15 +228,33 @@ make_event(Snapshot, Commit) ->
end,
{commit, Commit, Meta}.
encode_events(Events) ->
FmtVsn = 1,
encode_events(FmtVsn, Events).
%%
encode_events(FmtVsn, Events) ->
[encode_event(FmtVsn, E) || E <- Events].
-spec marshal(machinery_mg_schema:t(), machinery_mg_schema:v(_), machinery_mg_schema:context()) ->
{machinery_msgpack:t(), machinery_mg_schema:context()}.
marshal({event, FmtVsn}, V, C) ->
{encode_event_data(FmtVsn, V), C};
marshal({args, call}, V, C) ->
{encode_call(V), C};
marshal({response, call}, V, C) ->
{encode_call_result(V), C};
marshal(T, V, C) ->
machinery_mg_schema_generic:marshal(T, V, C).
encode_event(FmtVsn, Data) ->
#mg_stateproc_Content{format_version = FmtVsn, data = encode_event_data(FmtVsn, Data)}.
-spec unmarshal(machinery_mg_schema:t(), machinery_msgpack:t(), machinery_mg_schema:context()) ->
{machinery_mg_schema:v(_), machinery_mg_schema:context()}.
unmarshal({event, FmtVsn}, V, C) ->
{decode_event_data(FmtVsn, V), C};
unmarshal({args, call}, V, C) ->
{decode_call(V), C};
unmarshal({response, call}, V, C) ->
{decode_call_result(V), C};
unmarshal(T, V, C) ->
machinery_mg_schema_generic:unmarshal(T, V, C).
-spec get_version(machinery_mg_schema:vt()) -> machinery_mg_schema:version().
get_version(_) ->
1.
encode_event_data(1 = FmtVsn, {commit, Commit, Meta}) ->
{arr, [{str, <<"commit">>}, encode(commit, Commit), encode_commit_meta(FmtVsn, Meta)]}.
@ -243,7 +264,7 @@ encode_commit_meta(1, #{snapshot := Snapshot}) ->
encode_commit_meta(1, #{}) ->
{obj, #{}}.
decode_event(1 = FmtVsn, {arr, [{str, <<"commit">>}, Commit, Meta]}) ->
decode_event_data(1 = FmtVsn, {arr, [{str, <<"commit">>}, Commit, Meta]}) ->
{commit, decode(commit, Commit), decode_commit_meta(FmtVsn, Meta)}.
decode_commit_meta(1, {obj, #{{str, <<"snapshot">>} := Snapshot}}) ->

View File

@ -1,9 +1,38 @@
-module(dmt_api_woody_utils).
-export([get_woody_client/1]).
-export([ensure_woody_deadline_set/2]).
%% API
-spec get_woody_client(atom()) -> woody_client:options().
get_woody_client(Service) ->
Services = genlib_app:env(dmt_api, services, #{}),
make_woody_client(maps:get(Service, Services)).
-spec make_woody_client(#{atom() => _}) -> woody_client:options().
make_woody_client(#{url := Url} = Service) ->
lists:foldl(
fun(Opt, Acc) ->
case maps:get(Opt, Service, undefined) of
undefined -> Acc;
Value -> Acc#{Opt => Value}
end
end,
#{
url => Url,
event_handler => get_woody_event_handlers()
},
[
transport_opts,
resolver_opts
]
).
-spec get_woody_event_handlers() -> woody:ev_handlers().
get_woody_event_handlers() ->
genlib_app:env(dmt_api, woody_event_handlers, [scoper_woody_event_handler]).
-spec ensure_woody_deadline_set(woody_context:ctx(), woody_deadline:deadline()) -> woody_context:ctx().
ensure_woody_deadline_set(WoodyContext, Default) ->
case woody_context:get_deadline(WoodyContext) of