FF-51 Add repair callback (#7)

This commit is contained in:
Andrey Fadeev 2019-02-04 14:54:47 +03:00 committed by GitHub
parent ef24b4fff4
commit a8650ae0f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 11 deletions

View File

@ -28,7 +28,7 @@
-type limit() :: undefined | non_neg_integer().
-type direction() :: forward | backward.
-type range() :: {event_cursor(), limit(), direction()}.
-type signal(T) :: {init, args(T)} | timeout.
-type signal(T) :: {init, args(T)} | {repair, args(T)} | timeout.
-type machine(E, A) :: #{
namespace := namespace(),
id := id(),
@ -101,6 +101,9 @@
-callback init(args(_), machine(E, A), handler_args(_), handler_opts(_)) ->
result(E, A).
-callback process_repair(args(_), machine(E, A), handler_args(_), handler_opts(_)) ->
result(E, A).
-callback process_timeout(machine(E, A), handler_args(_), handler_opts(_)) ->
result(E, A).
@ -143,6 +146,8 @@ get(NS, ID, Range, Backend) ->
result(E, A).
dispatch_signal({init, Args}, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:init(Args, Machine, HandlerArgs, Opts);
dispatch_signal({repair, Args}, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_repair(Args, Machine, HandlerArgs, Opts);
dispatch_signal(timeout, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_timeout(Machine, HandlerArgs, Opts).

View File

@ -6,6 +6,7 @@
%% API
-export([start/5]).
-export([call/6]).
-export([repair/6]).
-export([get/5]).
%% Behaviour definition
@ -22,6 +23,9 @@
-callback call(namespace(), id(), range(), args(), backend_opts()) ->
{ok, machinery:response(_)} | {error, notfound}.
-callback repair(namespace(), id(), range(), args(), backend_opts()) ->
ok | {error, notfound | working}.
-callback get(namespace(), id(), range(), backend_opts()) ->
{ok, machinery:machine(_, _)} | {error, notfound}.
@ -39,6 +43,11 @@ start(Backend, Namespace, Id, Args, Opts) ->
call(Backend, Namespace, Id, Range, Args, Opts) ->
Backend:call(Namespace, Id, Range, Args, Opts).
-spec repair(backend(), namespace(), id(), range(), args(), backend_opts()) ->
ok | {error, notfound | working}.
repair(Backend, Namespace, Id, Range, Args, Opts) ->
Backend:repair(Namespace, Id, Range, Args, Opts).
-spec get(backend(), namespace(), id(), range(), backend_opts()) ->
{ok, machinery:machine(_, _)} | {error, notfound}.
get(Backend, Namespace, Id, Range, Opts) ->

View File

@ -21,6 +21,7 @@
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
%%
@ -105,6 +106,11 @@ process_call({untag, ID}, Machine, _, _Opts) ->
{{error, IDWas}, #{}}
end.
-spec process_repair({untag, id()}, machine(), undefined, handler_opts()) ->
no_return().
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, repair}).
%%
get_machine_st(#{history := History}) ->

View File

@ -73,6 +73,7 @@
-export([start/4]).
-export([call/5]).
-export([repair/5]).
-export([get/4]).
%% Woody handler
@ -137,6 +138,24 @@ call(NS, ID, Range, Args, Opts) ->
error({failed, NS, ID})
end.
-spec repair(namespace(), id(), range(), args(_), backend_opts()) ->
ok | {error, notfound | working}.
repair(NS, ID, Range, Args, Opts) ->
Client = get_client(Opts),
Schema = get_schema(Opts),
Descriptor = {NS, ID, Range},
CallArgs = marshal({schema, Schema, {args, repair}}, Args),
case machinery_mg_client:repair(marshal(descriptor, Descriptor), CallArgs, Client) of
{ok, ok} ->
ok;
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_MachineAlreadyWorking{}} ->
{error, working};
{exception, #mg_stateproc_NamespaceNotFound{}} ->
error({namespace_not_found, NS})
end.
-spec get(namespace(), id(), range(), backend_opts()) ->
{ok, machine(_, _)} | {error, notfound}.
get(NS, ID, Range, Opts) ->

View File

@ -10,6 +10,7 @@
-export([start/4]).
-export([call/3]).
-export([repair/3]).
-export([get_machine/2]).
-type woody_client() :: #{
@ -32,16 +33,17 @@ new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) ->
%%
-type namespace() :: mg_proto_base_thrift:'Namespace'().
-type id() :: mg_proto_base_thrift:'ID'().
-type args() :: mg_proto_state_processing_thrift:'Args'().
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type call_response() :: mg_proto_state_processing_thrift:'CallResponse'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type namespace_not_found() :: mg_proto_state_processing_thrift:'NamespaceNotFound'().
-type machine_not_found() :: mg_proto_state_processing_thrift:'MachineNotFound'().
-type machine_already_exists() :: mg_proto_state_processing_thrift:'MachineAlreadyExists'().
-type machine_failed() :: mg_proto_state_processing_thrift:'MachineFailed'().
-type namespace() :: mg_proto_base_thrift:'Namespace'().
-type id() :: mg_proto_base_thrift:'ID'().
-type args() :: mg_proto_state_processing_thrift:'Args'().
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type call_response() :: mg_proto_state_processing_thrift:'CallResponse'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type namespace_not_found() :: mg_proto_state_processing_thrift:'NamespaceNotFound'().
-type machine_not_found() :: mg_proto_state_processing_thrift:'MachineNotFound'().
-type machine_already_exists() :: mg_proto_state_processing_thrift:'MachineAlreadyExists'().
-type machine_already_working() :: mg_proto_state_processing_thrift:'MachineAlreadyWorking'().
-type machine_failed() :: mg_proto_state_processing_thrift:'MachineFailed'().
-spec start(namespace(), id(), args(), client()) ->
{ok, ok} |
@ -55,6 +57,12 @@ start(NS, ID, Args, Client) ->
call(Descriptor, Args, Client) ->
issue_call('Call', [Descriptor, Args], Client).
-spec repair(descriptor(), args(), client()) ->
{ok, ok} |
{exception, namespace_not_found() | machine_not_found() | machine_already_working()}.
repair(Descriptor, Args, Client) ->
issue_call('Repair', [Descriptor, Args], Client).
-spec get_machine(descriptor(), client()) ->
{ok, machine()} |
{exception, namespace_not_found() | machine_not_found()}.