diff --git a/config/sys.config b/config/sys.config index a72b6ed..5ca3794 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,6 +1,5 @@ [ {kernel, [ - {logger_sasl_compatible, false}, {logger_level, info}, {logger, [ {handler, default, logger_std_h, #{ @@ -16,20 +15,13 @@ {dmt_api, [ {repository, dmt_api_repository_v5}, {migration, #{ - timeout => 360, - limit => 20, + timeout => 360, + limit => 20, read_only_gap => 1000 }}, {ip, "::"}, {port, 8022}, {default_woody_handling_timeout, 30000}, - {scoper_event_handler_options, #{ - event_handler_opts => #{ - formatter_opts => #{ - max_length => 1000 - } - } - }}, {woody_event_handlers, [ {scoper_woody_event_handler, #{ event_handler_opts => #{ @@ -49,12 +41,13 @@ % Should be greater than any other timeouts idle_timeout => infinity }}, - {max_cache_size, 52428800}, % 50Mb - {health_checkers, [ - {erl_health, disk , ["/", 99] }, - {erl_health, cg_memory, [99] }, - {erl_health, service , [<<"dominant">>]} - ]}, + % 50Mb + {max_cache_size, 52428800}, + {health_check, #{ + disk => {erl_health, disk, ["/", 99]}, + memory => {erl_health, cg_memory, [99]}, + service => {erl_health, service, [<<"dominant">>]} + }}, {services, #{ automaton => #{ url => "http://machinegun:8022/v1/automaton", diff --git a/rebar.config b/rebar.config index 915a4db..7c540fc 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,6 @@ warnings_as_errors, warn_export_all, warn_missing_spec, - %% warn_untyped_record, FIXME ASAP!!!! Problem in thrift_protocol.hrl#L23 warn_export_vars, % by default @@ -29,8 +28,8 @@ {genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}}, {woody, {git, "https://github.com/valitydev/woody_erlang.git", {branch, "master"}}}, {damsel, {git, "https://github.com/valitydev/damsel.git", {branch, "master"}}}, - {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, {dmt_core, {git, "https://github.com/valitydev/dmt-core.git", {branch, "master"}}}, + {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {branch, "master"}}}, {scoper, {git, "https://github.com/valitydev/scoper.git", {branch, "master"}}}, {erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, master}}} ]}. diff --git a/rebar.lock b/rebar.lock index 08f7b04..3085860 100644 --- a/rebar.lock +++ b/rebar.lock @@ -36,11 +36,15 @@ 0}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, + {<<"machinery">>, + {git,"https://github.com/valitydev/machinery-erlang.git", + {ref,"4c0382f6cb14ebea361909c2a0e06efe0fc0d505"}}, + 0}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, {git,"https://github.com/valitydev/machinegun-proto.git", {ref,"a411c7d5d779389c70d2594eb4a28a916dce1721"}}, - 0}, + 1}, {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2}, {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2}, diff --git a/src/dmt_api.app.src b/src/dmt_api.app.src index 9aee458..1221952 100644 --- a/src/dmt_api.app.src +++ b/src/dmt_api.app.src @@ -9,7 +9,7 @@ woody, damsel, dmt_core, - mg_proto, + machinery, erl_health ]}, {mod, {dmt_api, []}}, diff --git a/src/dmt_api.erl b/src/dmt_api.erl index 840f5df..739c398 100644 --- a/src/dmt_api.erl +++ b/src/dmt_api.erl @@ -44,38 +44,43 @@ init(_) -> {ok, {#{strategy => one_for_one, intensity => 10, period => 60}, Children}}. get_repository_handlers() -> - Repository = genlib_app:env(?MODULE, repository, dmt_api_repository_v4), + Repository = genlib_app:env(?MODULE, repository, dmt_api_repository_v5), DefaultTimeout = genlib_app:env(?MODULE, default_woody_handling_timeout, timer:seconds(30)), [ - get_handler_spec(repository, #{ + get_handler(repository, #{ repository => Repository, default_handling_timeout => DefaultTimeout }), - get_handler_spec(repository_client, #{ + get_handler(repository_client, #{ repository => Repository, default_handling_timeout => DefaultTimeout }), - get_handler_spec(state_processor, Repository) + get_machinery_handler(Repository) ]. --spec get_handler_spec(repository | repository_client | state_processor, woody:options()) -> - {Path :: iodata(), {woody:service(), woody:handler(woody:options())}}. -get_handler_spec(repository, Options) -> +-spec get_handler(repository | repository_client | state_processor, woody:options()) -> + woody:http_handler(woody:th_handler()). +get_handler(repository, Options) -> {"/v1/domain/repository", { {dmsl_domain_conf_thrift, 'Repository'}, {dmt_api_repository_handler, Options} }}; -get_handler_spec(repository_client, Options) -> +get_handler(repository_client, Options) -> {"/v1/domain/repository_client", { {dmsl_domain_conf_thrift, 'RepositoryClient'}, {dmt_api_repository_client_handler, Options} - }}; -get_handler_spec(state_processor, Options) -> - {"/v1/stateproc", { - {mg_proto_state_processing_thrift, 'Processor'}, - {dmt_api_automaton_handler, Options} }}. +-spec get_machinery_handler(module()) -> + woody:http_handler(woody:th_handler()). +get_machinery_handler(Repository) -> + machinery_mg_backend:get_handler( + {Repository, #{ + path => "/v1/stateproc", + backend_config => #{schema => Repository} + }} + ). + -spec enable_health_logging(erl_health:check()) -> erl_health:check(). enable_health_logging(Check) -> EvHandler = {erl_health_event_handler, []}, diff --git a/src/dmt_api_automaton_client.erl b/src/dmt_api_automaton_client.erl deleted file mode 100644 index 7a51c87..0000000 --- a/src/dmt_api_automaton_client.erl +++ /dev/null @@ -1,87 +0,0 @@ --module(dmt_api_automaton_client). - --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --export([call/5]). --export([get_history/4]). --export([start/3]). - -%% - --type ns() :: mg_proto_base_thrift:'Namespace'(). --type id() :: mg_proto_base_thrift:'ID'(). --type args() :: mg_proto_state_processing_thrift:'Args'(). --type response() :: mg_proto_state_processing_thrift:'CallResponse'(). --type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'(). --type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'(). --type history() :: mg_proto_state_processing_thrift:'History'(). --type context() :: woody_context:ctx(). - -%% - --spec call(ns(), id(), history_range(), args(), context()) -> - response() - | no_return(). -call(NS, ID, HistoryRange, Args, Context) -> - Descriptor = construct_descriptor(NS, ID, HistoryRange), - case issue_rpc('Call', {Descriptor, Args}, Context) of - {ok, Result} -> - Result; - {error, #'mg_stateproc_MachineNotFound'{}} -> - ok = start(NS, ID, Context), - call(NS, ID, HistoryRange, Args, Context) - end. - --spec get_history(ns(), id(), history_range(), context()) -> - {ok, history()} - | {error, - mg_proto_state_processing_thrift:'EventNotFound'() - | mg_proto_state_processing_thrift:'MachineNotFound'()} - | no_return(). -get_history(NS, ID, HistoryRange, Context) -> - Descriptor = construct_descriptor(NS, ID, HistoryRange), - case issue_rpc('GetMachine', {Descriptor}, Context) of - {ok, #'mg_stateproc_Machine'{history = History}} -> - {ok, History}; - {error, _} = Error -> - Error - end. - --spec start(ns(), id(), context()) -> ok | no_return(). -start(NS, ID, Context) -> - case issue_rpc('Start', {NS, ID, {nl, #mg_msgpack_Nil{}}}, Context) of - {ok, _} -> - ok; - {error, #'mg_stateproc_MachineAlreadyExists'{}} -> - ok - end. - --spec construct_descriptor(ns(), id(), history_range()) -> descriptor(). -construct_descriptor(NS, ID, HistoryRange) -> - #'mg_stateproc_MachineDescriptor'{ - ns = NS, - ref = {id, ID}, - range = HistoryRange - }. - --spec issue_rpc(woody:func(), woody:args(), context()) -> term() | no_return(). -issue_rpc(Method, Args, Context) -> - Request = {{mg_proto_state_processing_thrift, 'Automaton'}, Method, Args}, - Opts = make_woody_options(automaton), - case woody_client:call(Request, Opts, Context) of - {ok, _} = Ok -> - Ok; - {exception, #'mg_stateproc_NamespaceNotFound'{}} -> - error(namespace_not_found); - {exception, #'mg_stateproc_MachineFailed'{}} -> - error(machine_failed); - {exception, Exception} -> - {error, Exception} - end. - --spec make_woody_options(atom()) -> woody_client:options(). -make_woody_options(Service) -> - Services = application:get_env(dmt_api, services, #{}), - #{Service := ServiceOptions} = Services, - EventHandlerOpts = genlib_app:env(dmt_api, scoper_event_handler_options, #{}), - ServiceOptions#{event_handler => {scoper_woody_event_handler, EventHandlerOpts}}. diff --git a/src/dmt_api_automaton_handler.erl b/src/dmt_api_automaton_handler.erl deleted file mode 100644 index be0d5e8..0000000 --- a/src/dmt_api_automaton_handler.erl +++ /dev/null @@ -1,67 +0,0 @@ --module(dmt_api_automaton_handler). - --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --callback process_call(call(), machine(), context()) -> - {response(), events()} | {response(), aux_state(), events()} | no_return(). - --callback process_signal(signal(), machine(), context()) -> - {action(), events()} | {action(), aux_state(), events()} | no_return(). - --export_type([call/0]). --export_type([signal/0]). --export_type([machine/0]). --export_type([response/0]). --export_type([action/0]). --export_type([aux_state/0]). --export_type([events/0]). - --type call() :: mg_proto_state_processing_thrift:'Args'(). --type signal() :: mg_proto_state_processing_thrift:'Signal'(). --type machine() :: mg_proto_state_processing_thrift:'Machine'(). --type response() :: mg_proto_state_processing_thrift:'CallResponse'(). --type action() :: mg_proto_state_processing_thrift:'ComplexAction'(). --type aux_state() :: mg_proto_state_processing_thrift:'AuxState'(). --type events() :: mg_proto_state_processing_thrift:'EventBodies'(). --type context() :: woody_context:ctx(). - -%% State processor - --behaviour(woody_server_thrift_handler). - --export([handle_function/4]). - --define(NIL, #mg_stateproc_Content{data = {nl, #mg_msgpack_Nil{}}}). - --spec handle_function(woody:func(), woody:args(), context(), woody:options()) -> {ok, woody:result()} | no_return(). -handle_function('ProcessCall', {#mg_stateproc_CallArgs{arg = Payload, machine = Machine}}, Context, Handler) -> - Result = Handler:process_call(Payload, Machine, Context), - {ok, construct_call_result(Result)}; -handle_function('ProcessSignal', {#mg_stateproc_SignalArgs{signal = Signal, machine = Machine}}, Context, Handler) -> - Result = Handler:process_signal(Signal, Machine, Context), - {ok, construct_signal_result(Result)}. - -%% Internals - -construct_call_result({Response, Events}) -> - construct_call_result(Response, ?NIL, Events); -construct_call_result({Response, AuxState, Events}) -> - construct_call_result(Response, AuxState, Events). - -construct_call_result(Response, AuxState, Events) -> - #mg_stateproc_CallResult{ - response = Response, - change = #mg_stateproc_MachineStateChange{aux_state = AuxState, events = Events}, - action = #mg_stateproc_ComplexAction{} - }. - -construct_signal_result({Action, Events}) -> - construct_signal_result(Action, ?NIL, Events); -construct_signal_result({Action, AuxState, Events}) -> - construct_signal_result(Action, AuxState, Events). - -construct_signal_result(Action, AuxState, Events) -> - #mg_stateproc_SignalResult{ - change = #mg_stateproc_MachineStateChange{aux_state = AuxState, events = Events}, - action = Action - }. diff --git a/src/dmt_api_repository_handler.erl b/src/dmt_api_repository_handler.erl index 4800481..28d461a 100644 --- a/src/dmt_api_repository_handler.erl +++ b/src/dmt_api_repository_handler.erl @@ -69,6 +69,7 @@ do_handle_function('Pull', {Version}, Context, Options) -> end. %% + handle_operation_error({conflict, Conflict}) -> #domain_conf_OperationConflict{ conflict = handle_operation_conflict(Conflict) @@ -78,32 +79,26 @@ handle_operation_error({invalid, Invalid}) -> errors = handle_operation_invalid(Invalid) }. -handle_operation_conflict(Conflict) -> - case Conflict of - {object_already_exists, Ref} -> - {object_already_exists, #domain_conf_ObjectAlreadyExistsConflict{object_ref = Ref}}; - {object_not_found, Ref} -> - {object_not_found, #domain_conf_ObjectNotFoundConflict{object_ref = Ref}}; - {object_reference_mismatch, Ref} -> - {object_reference_mismatch, #domain_conf_ObjectReferenceMismatchConflict{object_ref = Ref}} - end. +handle_operation_conflict({object_already_exists, Ref}) -> + {object_already_exists, #domain_conf_ObjectAlreadyExistsConflict{object_ref = Ref}}; +handle_operation_conflict({object_not_found, Ref}) -> + {object_not_found, #domain_conf_ObjectNotFoundConflict{object_ref = Ref}}; +handle_operation_conflict({object_reference_mismatch, Ref}) -> + {object_reference_mismatch, #domain_conf_ObjectReferenceMismatchConflict{object_ref = Ref}}. -handle_operation_invalid(Invalid) -> - case Invalid of - {objects_not_exist, Refs} -> - [ - {object_not_exists, #domain_conf_NonexistantObject{ - object_ref = Ref, - referenced_by = ReferencedBy - }} - || {Ref, ReferencedBy} <- Refs - ]; - {object_reference_cycles, Cycles} -> - [ - {object_reference_cycle, #domain_conf_ObjectReferenceCycle{cycle = Cycle}} - || Cycle <- Cycles - ] - end. +handle_operation_invalid({objects_not_exist, Refs}) -> + [ + {object_not_exists, #domain_conf_NonexistantObject{ + object_ref = Ref, + referenced_by = ReferencedBy + }} + || {Ref, ReferencedBy} <- Refs + ]; +handle_operation_invalid({object_reference_cycles, Cycles}) -> + [ + {object_reference_cycle, #domain_conf_ObjectReferenceCycle{cycle = Cycle}} + || Cycle <- Cycles + ]. -spec repository(options()) -> module(). repository(#{repository := Repository}) -> diff --git a/src/dmt_api_repository_v5.erl b/src/dmt_api_repository_v5.erl index f721dd0..a72d2f0 100644 --- a/src/dmt_api_repository_v5.erl +++ b/src/dmt_api_repository_v5.erl @@ -3,9 +3,8 @@ -behaviour(dmt_api_repository). -include_lib("damsel/include/dmsl_domain_conf_thrift.hrl"). --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). --define(NS, <<"domain-config">>). +-define(NS, 'domain-config'). -define(ID, <<"primary/v5">>). -define(BASE, 10). @@ -18,10 +17,29 @@ %% State processor --behaviour(dmt_api_automaton_handler). +-type args(T) :: machinery:args(T). +-type machine() :: machinery:machine(event(), _). +-type handler_args() :: machinery:handler_args(_). +-type handler_opts() :: machinery:handler_opts(_). +-type result() :: machinery:result(event(), none()). +-type response(T) :: machinery:response(T). --export([process_call/3]). --export([process_signal/3]). +% TODO +% Otherwise dialyzer spews pretty unreasonable complains. Lost any hope telling +% him that callback typespecs are fine. +% -behaviour(machinery). + +-export([init/4]). +-export([process_call/4]). +-export([process_timeout/3]). +-export([process_repair/4]). + +%% Storage schema + +-behaviour(machinery_mg_schema). +-export([marshal/3]). +-export([unmarshal/3]). +-export([get_version/1]). %% -record(st, { @@ -30,123 +48,121 @@ }). -type st() :: #st{}. --type context() :: woody_context:ctx(). --type history_range() :: mg_proto_state_processing_thrift:'HistoryRange'(). --type machine() :: mg_proto_state_processing_thrift:'Machine'(). --type history() :: mg_proto_state_processing_thrift:'History'(). +-type event() :: {commit, commit(), #{snapshot => snapshot()}}. -type ref() :: dmsl_domain_conf_thrift:'Reference'(). -type snapshot() :: dmt_api_repository:snapshot(). -type commit() :: dmt_api_repository:commit(). +-type version() :: dmt_api_repository:version(). --spec checkout(ref(), context()) -> +-spec checkout(ref(), woody_context:ctx()) -> {ok, snapshot()} | {error, version_not_found}. -checkout({head, #domain_conf_Head{}}, Context) -> - HistoryRange = #mg_stateproc_HistoryRange{ - 'after' = undefined, - 'limit' = ?BASE, - 'direction' = backward - }, - case get_history_by_range(HistoryRange, Context) of - #st{} = St -> - squash_state(St); - {error, version_not_found} -> - {error, version_not_found} - end; -checkout({version, V}, Context) -> +checkout({head, #domain_conf_Head{}}, WoodyCtx) -> + St = get_history_by_range({undefined, ?BASE, backward}, WoodyCtx), + squash_state(St); +checkout({version, V}, WoodyCtx) -> BaseV = get_base_version(V), - HistoryRange = #mg_stateproc_HistoryRange{ - 'after' = get_event_id(BaseV), - 'limit' = V - BaseV, - 'direction' = forward - }, - case get_history_by_range(HistoryRange, Context) of - #st{} = St -> - case squash_state(St) of - {ok, #domain_conf_Snapshot{version = V}} = Result -> - Result; - {ok, _} -> - {error, version_not_found} - end; - {error, version_not_found} -> + St = get_history_by_range({get_event_id(BaseV), V - BaseV, forward}, WoodyCtx), + case squash_state(St) of + {ok, #domain_conf_Snapshot{version = V}} = Result -> + Result; + {ok, _} -> {error, version_not_found} end. --spec pull(dmt_api_repository:version(), context()) -> +-spec pull(version(), woody_context:ctx()) -> {ok, dmt_api_repository:history()} | {error, version_not_found}. -pull(Version, Context) -> - pull(Version, undefined, Context). +pull(Version, WoodyCtx) -> + pull(Version, undefined, WoodyCtx). --spec pull(dmt_api_repository:version(), dmt_api_repository:limit(), context()) -> +-spec pull(version(), dmt_api_repository:limit(), woody_context:ctx()) -> {ok, dmt_api_repository:history()} | {error, version_not_found}. -pull(Version, Limit, Context) -> +pull(Version, Limit, WoodyCtx) -> After = get_event_id(Version), - case get_history_by_range(#mg_stateproc_HistoryRange{'after' = After, 'limit' = Limit}, Context) of - #st{history = History} -> - {ok, History}; - {error, version_not_found} -> - {error, version_not_found} - end. + St = get_history_by_range({After, Limit, forward}, WoodyCtx), + {ok, St#st.history}. --spec commit(dmt_api_repository:version(), commit(), context()) -> +-spec commit(version(), commit(), woody_context:ctx()) -> {ok, snapshot()} | {error, version_not_found | {operation_error, dmt_domain:operation_error()}}. -commit(Version, Commit, Context) -> +commit(Version, Commit, WoodyCtx) -> BaseID = get_event_id(get_base_version(Version)), - decode_call_result( - dmt_api_automaton_client:call( - ?NS, - ?ID, - %% TODO in theory, it's enought ?BASE + 1 events here, - %% but it's complicated and needs to be covered by tests - #mg_stateproc_HistoryRange{'after' = BaseID}, - encode_call({commit, Version, Commit}), - Context - ) - ). - -%% - --spec get_history_by_range(history_range(), context()) -> st() | {error, version_not_found}. -get_history_by_range(HistoryRange, Context) -> - case dmt_api_automaton_client:get_history(?NS, ?ID, HistoryRange, Context) of - {ok, History} -> - read_history(History); - {error, #mg_stateproc_MachineNotFound{}} -> - ok = dmt_api_automaton_client:start(?NS, ?ID, Context), - get_history_by_range(HistoryRange, Context); - {error, #mg_stateproc_EventNotFound{}} -> - {error, version_not_found} + %% TODO in theory, it's enought ?BASE + 1 events here, + %% but it's complicated and needs to be covered by tests + HistoryRange = {BaseID, undefined, forward}, + Backend = get_backend(WoodyCtx), + case machinery:call(?NS, ?ID, HistoryRange, {commit, Version, Commit}, Backend) of + {ok, Response} -> + Response; + {error, notfound} -> + ok = machinery:start(?NS, ?ID, undefined, Backend), + commit(Version, Commit, WoodyCtx) end. %% --spec process_call(dmt_api_automaton_handler:call(), machine(), context()) -> - {dmt_api_automaton_handler:response(), dmt_api_automaton_handler:events()} | no_return(). -process_call(Call, #mg_stateproc_Machine{ns = ?NS, id = ?ID} = Machine, Context) -> - Args = decode_call(Call), - {Result, Events} = handle_call(Args, read_history(Machine), Context), - {encode_call_result(Result), encode_events(Events)}; -process_call(_Call, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) -> - Message = <<"Unknown machine '", NS/binary, "' '", ID/binary, "'">>, - woody_error:raise(system, {internal, resource_unavailable, Message}). +-spec get_history_by_range(machinery:range(), woody_context:ctx()) -> st(). +get_history_by_range(HistoryRange, WoodyCtx) -> + Backend = get_backend(WoodyCtx), + case machinery:get(?NS, ?ID, HistoryRange, Backend) of + {ok, Machine} -> + read_history(Machine); + {error, notfound} -> + ok = machinery:start(?NS, ?ID, undefined, Backend), + get_history_by_range(HistoryRange, WoodyCtx) + end. --spec process_signal(dmt_api_automaton_handler:signal(), machine(), context()) -> - {dmt_api_automaton_handler:action(), dmt_api_automaton_handler:events()} | no_return(). -process_signal({init, #mg_stateproc_InitSignal{}}, #mg_stateproc_Machine{ns = ?NS, id = ?ID}, _Context) -> - {#mg_stateproc_ComplexAction{}, []}; -process_signal({timeout, #mg_stateproc_TimeoutSignal{}}, #mg_stateproc_Machine{ns = ?NS, id = ?ID}, _Context) -> - {#mg_stateproc_ComplexAction{}, []}; -process_signal(_Signal, #mg_stateproc_Machine{ns = NS, id = ID}, _Context) -> - Message = <<"Unknown machine '", NS/binary, "' '", ID/binary, "'">>, +-spec get_backend(woody_context:ctx()) -> machinery_mg_backend:backend(). +get_backend(WoodyCtx) -> + machinery_mg_backend:new(WoodyCtx, #{ + client => dmt_api_woody_utils:get_woody_client(automaton), + schema => ?MODULE + }). + +%% + +-spec init(args(_), machine(), handler_args(), handler_opts()) -> result(). +init(_Args, _Machine, _HandlerArgs, _HandlerOpts) -> + #{}. + +-spec process_call(args(Call), machine(), handler_args(), handler_opts()) -> + {response(Response), result()} +when + Call :: {commit, version(), commit()}, + Response :: + {ok, snapshot()} + | {error, version_not_found | head_mismatch | {operation_error, _Reason}}. +process_call(Args, #{namespace := ?NS, id := ?ID} = Machine, _HandlerArgs, _HandlerOpts) -> + handle_call(Args, read_history(Machine)); +process_call(_Call, Machine, _HandlerArgs, _HandlerOpts) -> + process_unexpected_machine(Machine). + +-spec process_timeout(machine(), handler_args(), handler_opts()) -> + result(). +process_timeout(#{namespace := ?NS, id := ?ID}, _HandlerArgs, _HandlerOpts) -> + #{}; +process_timeout(Machine, _HandlerArgs, _HandlerOpts) -> + process_unexpected_machine(Machine). + +-spec process_repair(args(_), machine(), handler_args(), handler_opts()) -> + {error, noimpl}. +process_repair(_Args, #{namespace := ?NS, id := ?ID}, _HandlerArgs, _HandlerOpts) -> + {error, noimpl}; +process_repair(_Args, Machine, _HandlerArgs, _HandlerOpts) -> + process_unexpected_machine(Machine). + +-spec process_unexpected_machine(machine()) -> + no_return(). +process_unexpected_machine(#{namespace := NS, id := ID}) -> + Message = genlib:format("Unexpected machine: ns = ~s, id = ~s", [NS, ID]), woody_error:raise(system, {internal, resource_unavailable, Message}). %% -handle_call({commit, Version, Commit}, St, _Context) -> +handle_call({commit, Version, Commit}, St) -> case squash_state(St) of {ok, #domain_conf_Snapshot{version = Version} = Snapshot} -> apply_commit(Snapshot, Commit); @@ -154,7 +170,7 @@ handle_call({commit, Version, Commit}, St, _Context) -> % Is this retry? Maybe we already applied this commit. check_commit(Version, Commit, St); {ok, _} -> - {{error, version_not_found}, []} + {{error, version_not_found}, #{}} end. apply_commit( @@ -164,45 +180,33 @@ apply_commit( case dmt_domain:apply_operations(Ops, DomainWas) of {ok, Domain} -> Snapshot = #domain_conf_Snapshot{version = VersionWas + 1, domain = Domain}, - {{ok, Snapshot}, [make_event(Snapshot, Commit)]}; + Event = make_event(Snapshot, Commit), + {{ok, Snapshot}, #{events => [Event]}}; {error, Reason} -> - {{error, {operation_error, Reason}}, []} + {{error, {operation_error, Reason}}, #{}} end. check_commit(Version, Commit, #st{snapshot = BaseSnapshot, history = History}) -> case maps:get(Version + 1, History) of Commit -> % it's ok, commit alredy applied, lets return this snapshot - {dmt_history:travel(Version + 1, History, BaseSnapshot), []}; + {dmt_history:travel(Version + 1, History, BaseSnapshot), #{}}; _ -> - {{error, head_mismatch}, []} + {{error, head_mismatch}, #{}} end. --spec read_history(machine() | history()) -> st(). -read_history(#mg_stateproc_Machine{history = Events}) -> - read_history(Events); -read_history(Events) -> - read_history(Events, #st{}). +-spec read_history(machine()) -> st(). +read_history(#{history := Events}) -> + lists:foldl(fun apply_event/2, #st{}, Events). --spec read_history([mg_proto_state_processing_thrift:'Event'()], st()) -> st(). -read_history([], St) -> - St; -read_history( - [#mg_stateproc_Event{id = Id, data = EventData, format_version = FmtVsn} | Rest], - #st{history = History} = St -) -> - {commit, Commit, Meta} = decode_event(FmtVsn, EventData), +-spec apply_event(machinery:event(event()), st()) -> st(). +apply_event({ID, _CreatedAt, {commit, Commit, Meta}}, #st{history = History} = St) -> + StNext = St#st{history = History#{ID => Commit}}, case Meta of #{snapshot := Snapshot} -> - read_history( - Rest, - St#st{snapshot = Snapshot, history = History#{Id => Commit}} - ); + StNext#st{snapshot = Snapshot}; #{} -> - read_history( - Rest, - St#st{history = History#{Id => Commit}} - ) + StNext end. squash_state(#st{snapshot = BaseSnapshot, history = History}) -> @@ -213,8 +217,7 @@ squash_state(#st{snapshot = BaseSnapshot, history = History}) -> error(Error) end. -%% - +-spec make_event(snapshot(), commit()) -> event(). make_event(Snapshot, Commit) -> Meta = case (Snapshot#domain_conf_Snapshot.version) rem ?BASE of @@ -225,15 +228,33 @@ make_event(Snapshot, Commit) -> end, {commit, Commit, Meta}. -encode_events(Events) -> - FmtVsn = 1, - encode_events(FmtVsn, Events). +%% -encode_events(FmtVsn, Events) -> - [encode_event(FmtVsn, E) || E <- Events]. +-spec marshal(machinery_mg_schema:t(), machinery_mg_schema:v(_), machinery_mg_schema:context()) -> + {machinery_msgpack:t(), machinery_mg_schema:context()}. +marshal({event, FmtVsn}, V, C) -> + {encode_event_data(FmtVsn, V), C}; +marshal({args, call}, V, C) -> + {encode_call(V), C}; +marshal({response, call}, V, C) -> + {encode_call_result(V), C}; +marshal(T, V, C) -> + machinery_mg_schema_generic:marshal(T, V, C). -encode_event(FmtVsn, Data) -> - #mg_stateproc_Content{format_version = FmtVsn, data = encode_event_data(FmtVsn, Data)}. +-spec unmarshal(machinery_mg_schema:t(), machinery_msgpack:t(), machinery_mg_schema:context()) -> + {machinery_mg_schema:v(_), machinery_mg_schema:context()}. +unmarshal({event, FmtVsn}, V, C) -> + {decode_event_data(FmtVsn, V), C}; +unmarshal({args, call}, V, C) -> + {decode_call(V), C}; +unmarshal({response, call}, V, C) -> + {decode_call_result(V), C}; +unmarshal(T, V, C) -> + machinery_mg_schema_generic:unmarshal(T, V, C). + +-spec get_version(machinery_mg_schema:vt()) -> machinery_mg_schema:version(). +get_version(_) -> + 1. encode_event_data(1 = FmtVsn, {commit, Commit, Meta}) -> {arr, [{str, <<"commit">>}, encode(commit, Commit), encode_commit_meta(FmtVsn, Meta)]}. @@ -243,7 +264,7 @@ encode_commit_meta(1, #{snapshot := Snapshot}) -> encode_commit_meta(1, #{}) -> {obj, #{}}. -decode_event(1 = FmtVsn, {arr, [{str, <<"commit">>}, Commit, Meta]}) -> +decode_event_data(1 = FmtVsn, {arr, [{str, <<"commit">>}, Commit, Meta]}) -> {commit, decode(commit, Commit), decode_commit_meta(FmtVsn, Meta)}. decode_commit_meta(1, {obj, #{{str, <<"snapshot">>} := Snapshot}}) -> diff --git a/src/dmt_api_woody_utils.erl b/src/dmt_api_woody_utils.erl index bcb9c41..ce3fbd2 100644 --- a/src/dmt_api_woody_utils.erl +++ b/src/dmt_api_woody_utils.erl @@ -1,9 +1,38 @@ -module(dmt_api_woody_utils). +-export([get_woody_client/1]). -export([ensure_woody_deadline_set/2]). %% API +-spec get_woody_client(atom()) -> woody_client:options(). +get_woody_client(Service) -> + Services = genlib_app:env(dmt_api, services, #{}), + make_woody_client(maps:get(Service, Services)). + +-spec make_woody_client(#{atom() => _}) -> woody_client:options(). +make_woody_client(#{url := Url} = Service) -> + lists:foldl( + fun(Opt, Acc) -> + case maps:get(Opt, Service, undefined) of + undefined -> Acc; + Value -> Acc#{Opt => Value} + end + end, + #{ + url => Url, + event_handler => get_woody_event_handlers() + }, + [ + transport_opts, + resolver_opts + ] + ). + +-spec get_woody_event_handlers() -> woody:ev_handlers(). +get_woody_event_handlers() -> + genlib_app:env(dmt_api, woody_event_handlers, [scoper_woody_event_handler]). + -spec ensure_woody_deadline_set(woody_context:ctx(), woody_deadline:deadline()) -> woody_context:ctx(). ensure_woody_deadline_set(WoodyContext, Default) -> case woody_context:get_deadline(WoodyContext) of