Add repair response processing (#22)

This commit is contained in:
Andrey Fadeev 2020-05-29 14:00:09 +03:00 committed by GitHub
parent 75cffa6659
commit 0a467c8b12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1029 additions and 59 deletions

2
Jenkinsfile vendored
View File

@ -38,7 +38,7 @@ build('machinery', 'docker-host', finalHook) {
}
runStage('dialyze') {
withWsCache("_build/default/rebar3_21.1.1_plt") {
withWsCache("_build/default/rebar3_22.3.1_plt") {
sh 'make wc_dialyze'
}
}

View File

@ -7,7 +7,7 @@ SUBMODULES = $(UTILS_PATH)
SUBTARGETS = $(patsubst %,%/.git,$(SUBMODULES))
SERVICE_NAME := machinery
BUILD_IMAGE_TAG := f3732d29a5e622aabf80542b5138b3631a726adb
BUILD_IMAGE_TAG := 0c638a682f4735a65ef232b81ed872ba494574c3
CALL_ANYWHERE := all submodules compile xref lint dialyze clean distclean

View File

@ -15,7 +15,7 @@ services:
condition: service_healthy
machinegun:
image: dr2.rbkmoney.com/rbkmoney/machinegun:3d89232e20e4f52129d7355b209a9a5a54ad5dbf
image: dr2.rbkmoney.com/rbkmoney/machinegun:9b160a5f39fa54b1a20ca9cc8a9a881cbcc9ed4f
command: /opt/machinegun/bin/machinegun foreground
volumes:
- ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml

View File

@ -56,13 +56,3 @@
]},
{plt_apps, all_deps}
]}.
{profiles, [
{test, [
{deps, [
{lager,
"3.6.1"
}
]}
]}
]}.

View File

@ -33,7 +33,7 @@
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.7.1">>},2},
{<<"snowflake">>,
{git,"https://github.com/rbkmoney/snowflake.git",
{ref,"0a598108f6582affe3b4ae550fc5b9f2062e318a"}},
{ref,"c34a962e17539e63a53f721cbf4ddcffeb0032a4"}},
1},
{<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.5">>},2},
{<<"thrift">>,
@ -43,7 +43,7 @@
{<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.4.1">>},3},
{<<"woody">>,
{git,"https://github.com/rbkmoney/woody_erlang.git",
{ref,"8b8c0e27796a6fc8bed4f474313e4c3487e10c82"}},
{ref,"ae8e7a9f6fa8a331c522e1e2e32271ef6ee0a98e"}},
0}]}.
[
{pkg_hash,[

View File

@ -81,6 +81,7 @@
%% Internal API
-export([dispatch_signal/4]).
-export([dispatch_call/4]).
-export([dispatch_repair/4]).
%% Behaviour definition
-type seconds() :: non_neg_integer().
@ -141,12 +142,12 @@ call(NS, Ref, Range, Args, Backend) ->
machinery_backend:call(Module, NS, Ref, Range, Args, Opts).
-spec repair(namespace(), ref(), args(_), backend(_)) ->
ok | {error, notfound | working}.
{ok, response(_)} | {error, notfound | working}.
repair(NS, Ref, Args, Backend) ->
repair(NS, Ref, {undefined, undefined, forward}, Args, Backend).
-spec repair(namespace(), ref(), range(), args(_), backend(_)) ->
ok | {error, notfound | working}.
{ok, response(_)} | {error, notfound | working}.
repair(NS, Ref, Range, Args, Backend) ->
{Module, Opts} = machinery_utils:get_backend(Backend),
machinery_backend:repair(Module, NS, Ref, Range, Args, Opts).
@ -169,7 +170,8 @@ get(NS, Ref, Range, Backend) ->
dispatch_signal({init, Args}, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:init(Args, Machine, HandlerArgs, Opts);
dispatch_signal({repair, Args}, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_repair(Args, Machine, HandlerArgs, Opts);
{_Response, Result} = Handler:process_repair(Args, Machine, HandlerArgs, Opts),
Result;
dispatch_signal(timeout, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_timeout(Machine, HandlerArgs, Opts).
@ -177,3 +179,8 @@ dispatch_signal(timeout, Machine, {Handler, HandlerArgs}, Opts) ->
{response(_), result(E, A)}.
dispatch_call(Args, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_call(Args, Machine, HandlerArgs, Opts).
-spec dispatch_repair(args(_), machine(E, A), logic_handler(_), handler_opts(_)) ->
{response(_), result(E, A)}.
dispatch_repair(Args, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_repair(Args, Machine, HandlerArgs, Opts).

View File

@ -25,7 +25,7 @@
{ok, machinery:response(_)} | {error, notfound}.
-callback repair(namespace(), id(), range(), args(), backend_opts()) ->
ok | {error, notfound | working}.
{ok, machinery:response(_)} | {error, notfound | working}.
-callback get(namespace(), id(), range(), backend_opts()) ->
{ok, machinery:machine(_, _)} | {error, notfound}.
@ -45,7 +45,7 @@ call(Backend, Namespace, Ref, Range, Args, Opts) ->
Backend:call(Namespace, Ref, Range, Args, Opts).
-spec repair(backend(), namespace(), ref(), range(), args(), backend_opts()) ->
ok | {error, notfound | working}.
{ok, machinery:response(_)} | {error, notfound | working}.
repair(Backend, Namespace, Ref, Range, Args, Opts) ->
Backend:repair(Namespace, Ref, Range, Args, Opts).

View File

@ -132,7 +132,7 @@ call(NS, Ref, Range, Args, Opts) ->
CallArgs = marshal({schema, Schema, {args, call}}, Args),
case machinery_mg_client:call(marshal(descriptor, Descriptor), CallArgs, Client) of
{ok, Response} ->
{ok, unmarshal({schema, Schema, response}, Response)};
{ok, unmarshal({schema, Schema, {response, call}}, Response)};
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_NamespaceNotFound{}} ->
@ -142,21 +142,23 @@ call(NS, Ref, Range, Args, Opts) ->
end.
-spec repair(namespace(), ref(), range(), args(_), backend_opts()) ->
ok | {error, notfound | working}.
{ok, response(_)} | {error, notfound | working}.
repair(NS, Ref, Range, Args, Opts) ->
Client = get_client(Opts),
Schema = get_schema(Opts),
Descriptor = {NS, Ref, Range},
CallArgs = marshal({schema, Schema, {args, repair}}, Args),
case machinery_mg_client:repair(marshal(descriptor, Descriptor), CallArgs, Client) of
{ok, ok} ->
ok;
{ok, Response} ->
{ok, unmarshal({schema, Schema, {response, repair}}, Response)};
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_MachineAlreadyWorking{}} ->
{error, working};
{exception, #mg_stateproc_NamespaceNotFound{}} ->
error({namespace_not_found, NS})
error({namespace_not_found, NS});
{exception, #mg_stateproc_MachineFailed{}} ->
error({failed, NS, Ref})
end.
-spec get(namespace(), ref(), range(), backend_opts()) ->
@ -180,13 +182,12 @@ get(NS, Ref, Range, Opts) ->
('ProcessSignal', woody:args(), woody_context:ctx(), backend_handler_opts()) ->
{ok, mg_proto_state_processing_thrift:'SignalResult'()};
('ProcessCall', woody:args(), woody_context:ctx(), backend_handler_opts()) ->
{ok, mg_proto_state_processing_thrift:'CallResult'()}.
handle_function(
'ProcessSignal',
[#mg_stateproc_SignalArgs{signal = Signal, machine = Machine}],
WoodyCtx,
#{handler := Handler, schema := Schema}
) ->
{ok, mg_proto_state_processing_thrift:'CallResult'()};
('ProcessRepair', woody:args(), woody_context:ctx(), backend_handler_opts()) ->
{ok, mg_proto_state_processing_thrift:'RepairResult'()}.
handle_function('ProcessSignal', FunctionArgs, WoodyCtx, Opts) ->
[#mg_stateproc_SignalArgs{signal = Signal, machine = Machine}] = FunctionArgs,
#{handler := Handler, schema := Schema} = Opts,
Machine1 = unmarshal({machine, Schema}, Machine),
Result = dispatch_signal(
unmarshal({signal, Schema}, Signal),
@ -195,12 +196,9 @@ handle_function(
get_handler_opts(WoodyCtx)
),
{ok, marshal({signal_result, Schema}, handle_result(Result, Machine1))};
handle_function(
'ProcessCall',
[#mg_stateproc_CallArgs{arg = Args, machine = Machine}],
WoodyCtx,
#{handler := Handler, schema := Schema}
) ->
handle_function('ProcessCall', FunctionArgs, WoodyCtx, Opts) ->
[#mg_stateproc_CallArgs{arg = Args, machine = Machine}] = FunctionArgs,
#{handler := Handler, schema := Schema} = Opts,
Machine1 = unmarshal({machine, Schema}, Machine),
{Response, Result} = dispatch_call(
unmarshal({schema, Schema, {args, call}}, Args),
@ -208,7 +206,18 @@ handle_function(
machinery_utils:get_handler(Handler),
get_handler_opts(WoodyCtx)
),
{ok, marshal({call_result, Schema}, {Response, handle_result(Result, Machine1)})}.
{ok, marshal({call_result, Schema}, {Response, handle_result(Result, Machine1)})};
handle_function('ProcessRepair', FunctionArgs, WoodyCtx, Opts) ->
[#mg_stateproc_RepairArgs{arg = Args, machine = Machine}] = FunctionArgs,
#{handler := Handler, schema := Schema} = Opts,
Machine1 = unmarshal({machine, Schema}, Machine),
{Response, Result} = dispatch_repair(
unmarshal({schema, Schema, {args, repair}}, Args),
Machine1,
machinery_utils:get_handler(Handler),
get_handler_opts(WoodyCtx)
),
{ok, marshal({repair_result, Schema}, {Response, handle_result(Result, Machine1)})}.
%% Utils
@ -232,6 +241,9 @@ dispatch_signal(Signal, Machine, Handler, Opts) ->
dispatch_call(Args, Machine, Handler, Opts) ->
machinery:dispatch_call(Args, Machine, Handler, Opts).
dispatch_repair(Args, Machine, Handler, Opts) ->
machinery:dispatch_repair(Args, Machine, Handler, Opts).
handle_result(Result, OrigMachine) ->
Result#{aux_state => set_aux_state(
maps:get(aux_state, Result, undefined),
@ -305,7 +317,14 @@ marshal({signal_result, Schema}, #{} = V) ->
marshal({call_result, Schema}, {Response, #{} = V}) ->
#mg_stateproc_CallResult{
response = marshal({schema, Schema, response}, Response),
response = marshal({schema, Schema, {response, call}}, Response),
change = marshal({state_change, Schema}, V),
action = marshal(action, maps:get(action, V, []))
};
marshal({repair_result, Schema}, {Response, #{} = V}) ->
#mg_stateproc_RepairResult{
response = marshal({schema, Schema, {response, repair}}, Response),
change = marshal({state_change, Schema}, V),
action = marshal(action, maps:get(action, V, []))
};
@ -315,7 +334,7 @@ marshal({state_change, Schema}, #{} = V) ->
#mg_stateproc_MachineStateChange{
events = [
#mg_stateproc_Content{data = Event}
|| Event <- marshal({list, {schema, Schema, event}}, maps:get(events, V, []))
|| Event <- marshal({list, {schema, Schema, {event, Version}}}, maps:get(events, V, []))
],
% TODO
% Provide this to logic handlers as well

View File

@ -38,6 +38,7 @@ new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) ->
-type args() :: mg_proto_state_processing_thrift:'Args'().
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type call_response() :: mg_proto_state_processing_thrift:'CallResponse'().
-type repair_response() :: mg_proto_state_processing_thrift:'RepairResponse'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type namespace_not_found() :: mg_proto_state_processing_thrift:'NamespaceNotFound'().
-type machine_not_found() :: mg_proto_state_processing_thrift:'MachineNotFound'().
@ -58,8 +59,8 @@ call(Descriptor, Args, Client) ->
issue_call('Call', [Descriptor, Args], Client).
-spec repair(descriptor(), args(), client()) ->
{ok, ok} |
{exception, namespace_not_found() | machine_not_found() | machine_already_working()}.
{ok, repair_response()} |
{exception, namespace_not_found() | machine_not_found() | machine_already_working() | machine_failed()}.
repair(Descriptor, Args, Client) ->
issue_call('Repair', [Descriptor, Args], Client).

View File

@ -14,7 +14,10 @@
repair |
call
} |
response |
{response,
call |
repair
} |
{event, Version} |
{aux_state, Version}.
-type v(T) ::

View File

@ -53,21 +53,9 @@ start_apps(AppNames) ->
-spec start_app(app_name()) -> {[Started :: app_name()], startup_ctx()}.
start_app(lager = AppName) ->
{start_app_with(AppName, [
{async_threshold, 1},
{async_threshold_window, 0},
{error_logger_hwm, 600},
{suppress_application_start_stop, true},
{suppress_supervisor_start_stop, true},
{handlers, [
{lager_common_test_backend, debug}
]}
]), #{}};
start_app(scoper = AppName) ->
{start_app_with(AppName, [
{storage, scoper_storage_lager}
{storage, scoper_storage_logger}
]), #{}};
start_app(woody = AppName) ->

View File

@ -20,7 +20,7 @@ start() ->
-spec stop(pid()) -> ok.
stop(PID) ->
true = exit(PID, shutdown),
true = exit(PID, kill),
ok.
%%

View File

@ -7,6 +7,9 @@ namespaces:
payproc/tags:
processor:
url: http://machinery:8022/v1/stateproc
general:
processor:
url: http://machinery:8022/v1/stateproc
storage:
type: memory

View File

@ -0,0 +1,234 @@
-module(machinery_call_SUITE).
-behaviour(machinery).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
%% Common Tests callbacks
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
%% Tests
-export([ordinary_call_test/1]).
-export([notfound_call_test/1]).
-export([unknown_namespace_call_test/1]).
-export([ranged_call_test/1]).
-export([failed_call_test/1]).
-export([remove_call_test/1]).
%% Machinery callbacks
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
%% Internal types
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
-spec all() -> [test_case_name() | {group, group_name()}].
all() ->
[
{group, machinery_mg_backend}
].
-spec groups() ->
[{group_name(), list(), test_case_name()}].
groups() ->
[
{machinery_mg_backend, [], [{group, all}]},
{all, [parallel], [
ordinary_call_test,
notfound_call_test,
unknown_namespace_call_test,
ranged_call_test,
failed_call_test,
remove_call_test
]}
].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
{StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]),
[{started_apps, StartedApps}| C].
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_helper:stop_apps(?config(started_apps, C)),
ok.
-spec init_per_group(group_name(), config()) -> config().
init_per_group(machinery_mg_backend = Name, C0) ->
C1 = [{backend, Name}, {group_sup, ct_sup:start()} | C0],
{ok, _Pid} = start_backend(C1),
C1;
init_per_group(_Name, C) ->
C.
-spec end_per_group(group_name(), config()) -> config().
end_per_group(machinery_mg_backend, C) ->
ok = ct_sup:stop(?config(group_sup, C)),
C;
end_per_group(_Name, C) ->
C.
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(TestCaseName, C) ->
ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C).
%% Tests
-spec ordinary_call_test(config()) -> test_return().
ordinary_call_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertEqual({ok, done}, call(ID, do_something, C)).
-spec notfound_call_test(config()) -> test_return().
notfound_call_test(C) ->
ID = unique(),
?assertEqual({error, notfound}, call(ID, do_something, C)).
-spec unknown_namespace_call_test(config()) -> test_return().
unknown_namespace_call_test(C) ->
ID = unique(),
?assertError({namespace_not_found, mmm}, machinery:call(mmm, ID, do_something, get_backend(C))).
-spec ranged_call_test(config()) -> test_return().
ranged_call_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertEqual({ok, lists:seq(9, 1, -1)}, call(ID, get_events, {10, 9, backward}, C)),
?assertEqual({ok, lists:seq(3, 11)}, call(ID, get_events, {2, 9, forward}, C)).
-spec failed_call_test(config()) -> test_return().
failed_call_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertError({failed, general, ID}, call(ID, fail, C)).
-spec remove_call_test(config()) -> test_return().
remove_call_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertEqual({ok, removed}, call(ID, remove, C)),
?assertEqual({error, notfound}, call(ID, do_something, C)).
%% Machinery handler
-type event() :: any().
-type aux_st() :: any().
-type machine() :: machinery:machine(event(), aux_st()).
-type handler_opts() :: machinery:handler_opts(_).
-type result() :: machinery:result(event(), aux_st()).
-type response() :: machinery:response(_).
-spec init(_Args, machine(), undefined, handler_opts()) ->
result().
init(init_numbers, _Machine, _, _Opts) ->
#{
events => lists:seq(1, 100)
}.
-spec process_timeout(machine(), undefined, handler_opts()) ->
result().
process_timeout(#{}, _, _Opts) ->
erlang:error({not_implemented, process_timeout}).
-spec process_call(_Args, machine(), undefined, handler_opts()) ->
{response(), result()}.
process_call(do_something, _Machine, _, _Opts) ->
{done, #{
events => [1, yet_another_event],
aux_state => <<>>
}};
process_call(get_events, #{history := History}, _, _Opts) ->
Bodies = lists:map(fun({_ID, _CreatedAt, Body}) -> Body end, History),
{Bodies, #{}};
process_call(remove, _Machine, _, _Opts) ->
{removed, #{action => [remove]}};
process_call(fail, _Machine, _, _Opts) ->
erlang:error(fail).
-spec process_repair(_Args, machine(), undefined, handler_opts()) ->
no_return().
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
%% Helpers
start(ID, Args, C) ->
machinery:start(namespace(), ID, Args, get_backend(C)).
call(Ref, Args, C) ->
machinery:call(namespace(), Ref, Args, get_backend(C)).
call(Ref, Args, Range, C) ->
machinery:call(namespace(), Ref, Range, Args, get_backend(C)).
namespace() ->
general.
unique() ->
genlib:unique().
start_backend(C) ->
{ok, _PID} = supervisor:start_child(
?config(group_sup, C),
child_spec(C)
).
-spec child_spec(config()) ->
supervisor:child_spec().
child_spec(C) ->
child_spec(?config(backend, C), C).
-spec child_spec(atom(), config()) ->
supervisor:child_spec().
child_spec(machinery_mg_backend, _C) ->
BackendConfig = #{
path => <<"/v1/stateproc">>,
backend_config => #{
schema => machinery_mg_schema_generic
}
},
Handler = {?MODULE, BackendConfig},
Routes = machinery_mg_backend:get_routes(
[Handler],
#{event_handler => woody_event_handler_default}
),
ServerConfig = #{
ip => {0, 0, 0, 0},
port => 8022
},
machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig).
-spec get_backend(config()) ->
machinery_mg_backend:backend().
get_backend(C) ->
get_backend(?config(backend, C), C).
-spec get_backend(atom(), config()) ->
machinery_mg_backend:backend().
get_backend(machinery_mg_backend, C) ->
machinery_mg_backend:new(
ct_helper:get_woody_ctx(C),
#{
client => #{
url => <<"http://machinegun:8022/v1/automaton">>,
event_handler => woody_event_handler_default
},
schema => machinery_mg_schema_generic
}
).

View File

@ -50,7 +50,7 @@ init_per_suite(C) ->
% _ = dbg:tracer(),
% _ = dbg:p(all, c),
% _ = dbg:tpl({'woody_client', '_', '_'}, x),
{StartedApps, _StartupCtx} = start_apps([lager, machinery]),
{StartedApps, _StartupCtx} = start_apps([machinery]),
SuiteSup = ct_sup:start(),
start_woody_server([
{started_apps , StartedApps},

View File

@ -0,0 +1,250 @@
-module(machinery_repair_SUITE).
-behaviour(machinery).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
%% Common Tests callbacks
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
%% Tests
-export([simple_repair_test/1]).
-export([complex_repair_test/1]).
-export([ranged_repair_test/1]).
-export([notfound_repair_test/1]).
-export([failed_repair_test/1]).
-export([unknown_namespace_repair_test/1]).
-export([working_repair_test/1]).
%% Machinery callbacks
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
%% Internal types
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
-spec all() -> [test_case_name() | {group, group_name()}].
all() ->
[
{group, machinery_mg_backend}
].
-spec groups() ->
[{group_name(), list(), test_case_name()}].
groups() ->
[
{machinery_mg_backend, [], [{group, all}]},
{all, [parallel], [
simple_repair_test,
complex_repair_test,
ranged_repair_test,
notfound_repair_test,
failed_repair_test,
unknown_namespace_repair_test,
working_repair_test
]}
].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
{StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]),
[{started_apps, StartedApps}| C].
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_helper:stop_apps(?config(started_apps, C)),
ok.
-spec init_per_group(group_name(), config()) -> config().
init_per_group(machinery_mg_backend = Name, C0) ->
C1 = [{backend, Name}, {group_sup, ct_sup:start()} | C0],
{ok, _Pid} = start_backend(C1),
C1;
init_per_group(_Name, C) ->
C.
-spec end_per_group(group_name(), config()) -> config().
end_per_group(machinery_mg_backend, C) ->
ok = ct_sup:stop(?config(group_sup, C)),
C;
end_per_group(_Name, C) ->
C.
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(TestCaseName, C) ->
ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C).
%% Tests
-spec simple_repair_test(config()) -> test_return().
simple_repair_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertError({failed, general, ID}, call(ID, fail, C)),
?assertEqual({ok, <<"ok">>}, repair(ID, simple, C)),
?assertEqual({ok, lists:seq(1, 100)}, call(ID, get_events, C)).
-spec complex_repair_test(config()) -> test_return().
complex_repair_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertError({failed, general, ID}, call(ID, fail, C)),
?assertEqual({ok, <<"ok">>}, repair(ID, {add_events, [repair_event]}, C)),
?assertEqual({ok, lists:seq(1, 100) ++ [repair_event]}, call(ID, get_events, C)).
-spec ranged_repair_test(config()) -> test_return().
ranged_repair_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertError({failed, general, ID}, call(ID, fail, C)),
?assertEqual({ok, <<"ok">>}, repair(ID, count_events, {20, 10, forward}, C)),
?assertEqual({ok, lists:seq(1, 100) ++ [{count_events, 10}]}, call(ID, get_events, C)).
-spec notfound_repair_test(config()) -> test_return().
notfound_repair_test(C) ->
ID = unique(),
?assertEqual({error, notfound}, repair(ID, simple, C)).
-spec failed_repair_test(config()) -> test_return().
failed_repair_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertError({failed, general, ID}, call(ID, fail, C)),
?assertError({failed, general, ID}, repair(ID, fail, C)),
?assertError({failed, general, ID}, call(ID, get_events, C)).
-spec unknown_namespace_repair_test(config()) -> test_return().
unknown_namespace_repair_test(C) ->
ID = unique(),
?assertError({namespace_not_found, mmm}, machinery:repair(mmm, ID, simple, get_backend(C))).
-spec working_repair_test(config()) -> test_return().
working_repair_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertEqual({error, working}, repair(ID, simple, C)).
%% Machinery handler
-type event() :: any().
-type aux_st() :: any().
-type machine() :: machinery:machine(event(), aux_st()).
-type handler_opts() :: machinery:handler_opts(_).
-type result() :: machinery:result(event(), aux_st()).
-type response() :: machinery:response(_).
-spec init(_Args, machine(), undefined, handler_opts()) ->
result().
init(init_numbers, _Machine, _, _Opts) ->
#{
events => lists:seq(1, 100)
}.
-spec process_timeout(machine(), undefined, handler_opts()) ->
result().
process_timeout(#{}, _, _Opts) ->
erlang:error({not_implemented, process_timeout}).
-spec process_call(_Args, machine(), undefined, handler_opts()) ->
{response(), result()}.
process_call(get_events, #{history := History}, _, _Opts) ->
Bodies = lists:map(fun({_ID, _CreatedAt, Body}) -> Body end, History),
{Bodies, #{}};
process_call(fail, _Machine, _, _Opts) ->
erlang:error(fail).
-spec process_repair(_Args, machine(), undefined, handler_opts()) ->
no_return().
process_repair(simple, _Machine, _, _Opts) ->
{done, #{}};
process_repair({add_events, Events}, _Machine, _, _Opts) ->
{done, #{events => Events}};
process_repair(count_events, #{history := History}, _, _Opts) ->
{done, #{events => [{count_events, erlang:length(History)}]}};
process_repair(fail, _Machine, _, _Opts) ->
erlang:error(fail).
%% Helpers
start(ID, Args, C) ->
machinery:start(namespace(), ID, Args, get_backend(C)).
call(Ref, Args, C) ->
machinery:call(namespace(), Ref, Args, get_backend(C)).
repair(Ref, Args, C) ->
machinery:repair(namespace(), Ref, Args, get_backend(C)).
repair(Ref, Args, Range, C) ->
machinery:repair(namespace(), Ref, Range, Args, get_backend(C)).
namespace() ->
general.
unique() ->
genlib:unique().
start_backend(C) ->
{ok, _PID} = supervisor:start_child(
?config(group_sup, C),
child_spec(C)
).
-spec child_spec(config()) ->
supervisor:child_spec().
child_spec(C) ->
child_spec(?config(backend, C), C).
-spec child_spec(atom(), config()) ->
supervisor:child_spec().
child_spec(machinery_mg_backend, _C) ->
BackendConfig = #{
path => <<"/v1/stateproc">>,
backend_config => #{
schema => machinery_mg_schema_generic
}
},
Handler = {?MODULE, BackendConfig},
Routes = machinery_mg_backend:get_routes(
[Handler],
#{event_handler => woody_event_handler_default}
),
ServerConfig = #{
ip => {0, 0, 0, 0},
port => 8022
},
machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig).
-spec get_backend(config()) ->
machinery_mg_backend:backend().
get_backend(C) ->
get_backend(?config(backend, C), C).
-spec get_backend(atom(), config()) ->
machinery_mg_backend:backend().
get_backend(machinery_mg_backend, C) ->
machinery_mg_backend:new(
ct_helper:get_woody_ctx(C),
#{
client => #{
url => <<"http://machinegun:8022/v1/automaton">>,
event_handler => woody_event_handler_default
},
schema => machinery_mg_schema_generic
}
).

View File

@ -0,0 +1,205 @@
-module(machinery_start_SUITE).
-behaviour(machinery).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
%% Common Tests callbacks
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
%% Tests
-export([ordinary_start_test/1]).
-export([exists_start_test/1]).
-export([unknown_namespace_start_test/1]).
-export([failed_start_test/1]).
%% Machinery callbacks
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
%% Internal types
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
-spec all() -> [test_case_name() | {group, group_name()}].
all() ->
[
{group, machinery_mg_backend}
].
-spec groups() ->
[{group_name(), list(), test_case_name()}].
groups() ->
[
{machinery_mg_backend, [], [{group, all}]},
{all, [parallel], [
ordinary_start_test,
exists_start_test,
unknown_namespace_start_test,
failed_start_test
]}
].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
{StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]),
[{started_apps, StartedApps}| C].
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_helper:stop_apps(?config(started_apps, C)),
ok.
-spec init_per_group(group_name(), config()) -> config().
init_per_group(machinery_mg_backend = Name, C0) ->
C1 = [{backend, Name}, {group_sup, ct_sup:start()} | C0],
{ok, _Pid} = start_backend(C1),
C1;
init_per_group(_Name, C) ->
C.
-spec end_per_group(group_name(), config()) -> config().
end_per_group(machinery_mg_backend, C) ->
ok = ct_sup:stop(?config(group_sup, C)),
C;
end_per_group(_Name, C) ->
C.
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(TestCaseName, C) ->
ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C).
%% Tests
-spec ordinary_start_test(config()) -> test_return().
ordinary_start_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_something, C)).
-spec exists_start_test(config()) -> test_return().
exists_start_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_something, C)),
?assertEqual({error, exists}, start(ID, init_something, C)).
-spec unknown_namespace_start_test(config()) -> test_return().
unknown_namespace_start_test(C) ->
ID = unique(),
?assertError({namespace_not_found, mmm}, machinery:start(mmm, ID, init_something, get_backend(C))).
-spec failed_start_test(config()) -> test_return().
failed_start_test(C) ->
ID = unique(),
?assertError({failed, general, ID}, start(ID, fail, C)),
?assertError({failed, general, ID}, start(ID, fail, C)),
?assertEqual(ok, start(ID, init_something, C)),
?assertEqual({error, exists}, start(ID, fail, C)).
%% Machinery handler
-type event() :: any().
-type aux_st() :: any().
-type machine() :: machinery:machine(event(), aux_st()).
-type handler_opts() :: machinery:handler_opts(_).
-type result() :: machinery:result(event(), aux_st()).
-type response() :: machinery:response(_).
-spec init(_Args, machine(), undefined, handler_opts()) ->
result().
init(init_something, _Machine, _, _Opts) ->
#{
events => [init_event],
aux_state => #{some => <<"complex">>, aux_state => 1}
};
init(fail, _Machine, _, _Opts) ->
erlang:error(fail).
-spec process_timeout(machine(), undefined, handler_opts()) ->
result().
process_timeout(#{}, _, _Opts) ->
#{}.
-spec process_call(_Args, machine(), undefined, handler_opts()) ->
{response(), result()}.
process_call(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_call}).
-spec process_repair(_Args, machine(), undefined, handler_opts()) ->
no_return().
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
%% Helpers
start(ID, Args, C) ->
machinery:start(namespace(), ID, Args, get_backend(C)).
namespace() ->
general.
unique() ->
genlib:unique().
start_backend(C) ->
{ok, _PID} = supervisor:start_child(
?config(group_sup, C),
child_spec(C)
).
-spec child_spec(config()) ->
supervisor:child_spec().
child_spec(C) ->
child_spec(?config(backend, C), C).
-spec child_spec(atom(), config()) ->
supervisor:child_spec().
child_spec(machinery_mg_backend, _C) ->
BackendConfig = #{
path => <<"/v1/stateproc">>,
backend_config => #{
schema => machinery_mg_schema_generic
}
},
Handler = {?MODULE, BackendConfig},
Routes = machinery_mg_backend:get_routes(
[Handler],
#{event_handler => woody_event_handler_default}
),
ServerConfig = #{
ip => {0, 0, 0, 0},
port => 8022
},
machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig).
-spec get_backend(config()) ->
machinery_mg_backend:backend().
get_backend(C) ->
get_backend(?config(backend, C), C).
-spec get_backend(atom(), config()) ->
machinery_mg_backend:backend().
get_backend(machinery_mg_backend, C) ->
machinery_mg_backend:new(
ct_helper:get_woody_ctx(C),
#{
client => #{
url => <<"http://machinegun:8022/v1/automaton">>,
event_handler => woody_event_handler_default
},
schema => machinery_mg_schema_generic
}
).

View File

@ -0,0 +1,270 @@
-module(machinery_timeout_SUITE).
-behaviour(machinery).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
%% Common Tests callbacks
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
%% Tests
-export([start_with_timer_test/1]).
-export([start_with_continue_test/1]).
-export([start_with_ranged_timer_test/1]).
-export([call_with_timer_test/1]).
-export([call_with_continue_test/1]).
-export([call_with_ranged_timer_test/1]).
%% Machinery callbacks
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
%% Internal types
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
-spec all() -> [test_case_name() | {group, group_name()}].
all() ->
[
{group, machinery_mg_backend}
].
-spec groups() ->
[{group_name(), list(), test_case_name()}].
groups() ->
[
{machinery_mg_backend, [], [{group, all}]},
{all, [parallel], [
start_with_timer_test,
start_with_continue_test,
start_with_ranged_timer_test,
call_with_timer_test,
call_with_continue_test,
call_with_ranged_timer_test
]}
].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
{StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]),
[{started_apps, StartedApps}| C].
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_helper:stop_apps(?config(started_apps, C)),
ok.
-spec init_per_group(group_name(), config()) -> config().
init_per_group(machinery_mg_backend = Name, C0) ->
C1 = [{backend, Name}, {group_sup, ct_sup:start()} | C0],
{ok, _Pid} = start_backend(C1),
C1;
init_per_group(_Name, C) ->
C.
-spec end_per_group(group_name(), config()) -> config().
end_per_group(machinery_mg_backend, C) ->
ok = ct_sup:stop(?config(group_sup, C)),
C;
end_per_group(_Name, C) ->
C.
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(TestCaseName, C) ->
ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C).
%% Tests
-spec start_with_timer_test(config()) -> test_return().
start_with_timer_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_timer, C)),
timer:sleep(timer:seconds(5)),
Expected = lists:seq(1, 10),
?assertMatch({ok, #{aux_state := Expected}}, get(ID, C)).
-spec start_with_continue_test(config()) -> test_return().
start_with_continue_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_continue, C)),
timer:sleep(timer:seconds(5)),
Expected = lists:seq(1, 10),
?assertMatch({ok, #{aux_state := Expected}}, get(ID, C)).
-spec start_with_ranged_timer_test(config()) -> test_return().
start_with_ranged_timer_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_timer_with_range, C)),
timer:sleep(timer:seconds(5)),
Expected = lists:seq(2, 3),
?assertMatch({ok, #{aux_state := Expected}}, get(ID, C)).
-spec call_with_timer_test(config()) -> test_return().
call_with_timer_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, nop, C)),
?assertEqual({ok, done}, call(ID, timer, C)),
timer:sleep(timer:seconds(5)),
Expected = lists:seq(1, 10),
?assertMatch({ok, #{aux_state := Expected}}, get(ID, C)).
-spec call_with_continue_test(config()) -> test_return().
call_with_continue_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, nop, C)),
?assertEqual({ok, done}, call(ID, continue, C)),
timer:sleep(timer:seconds(5)),
Expected = lists:seq(1, 10),
?assertMatch({ok, #{aux_state := Expected}}, get(ID, C)).
-spec call_with_ranged_timer_test(config()) -> test_return().
call_with_ranged_timer_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, nop, C)),
?assertEqual({ok, done}, call(ID, timer_with_range, C)),
timer:sleep(timer:seconds(5)),
Expected = lists:seq(2, 3),
?assertMatch({ok, #{aux_state := Expected}}, get(ID, C)).
%% Machinery handler
-type event() :: any().
-type aux_st() :: any().
-type machine() :: machinery:machine(event(), aux_st()).
-type handler_opts() :: machinery:handler_opts(_).
-type result() :: machinery:result(event(), aux_st()).
-type response() :: machinery:response(_).
-spec init(_Args, machine(), undefined, handler_opts()) ->
result().
init(nop, _Machine, _, _Opts) ->
#{};
init(init_timer, _Machine, _, _Opts) ->
#{
events => lists:seq(1, 10),
action => {set_timer, {timeout, 0}}
};
init(init_continue, _Machine, _, _Opts) ->
#{
events => lists:seq(1, 10),
action => continue
};
init(init_timer_with_range, _Machine, _, _Opts) ->
#{
events => lists:seq(1, 10),
action => {set_timer, {deadline, {{{1990, 01, 01}, {0, 0, 0}}, 0}}, {1, 2, forward}, 15}
}.
-spec process_timeout(machine(), undefined, handler_opts()) ->
result().
process_timeout(#{history := History}, _, _Opts) ->
Bodies = lists:map(fun({_ID, _CreatedAt, Body}) -> Body end, History),
#{
events => [timer_fired],
action => unset_timer, % why not
aux_state => Bodies
}.
-spec process_call(_Args, machine(), undefined, handler_opts()) ->
{response(), result()}.
process_call(timer, _Machine, _, _Opts) ->
{done, #{
events => lists:seq(1, 10),
action => {set_timer, {timeout, 0}}
}};
process_call(continue, _Machine, _, _Opts) ->
{done, #{
events => lists:seq(1, 10),
action => continue
}};
process_call(timer_with_range, _Machine, _, _Opts) ->
{done, #{
events => lists:seq(1, 10),
action => {set_timer, {deadline, {{{1990, 01, 01}, {0, 0, 0}}, 0}}, {1, 2, forward}, 15}
}}.
-spec process_repair(_Args, machine(), undefined, handler_opts()) ->
no_return().
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
%% Helpers
start(ID, Args, C) ->
machinery:start(namespace(), ID, Args, get_backend(C)).
call(Ref, Args, C) ->
machinery:call(namespace(), Ref, Args, get_backend(C)).
get(Ref, C) ->
machinery:get(namespace(), Ref, get_backend(C)).
namespace() ->
general.
unique() ->
genlib:unique().
start_backend(C) ->
{ok, _PID} = supervisor:start_child(
?config(group_sup, C),
child_spec(C)
).
-spec child_spec(config()) ->
supervisor:child_spec().
child_spec(C) ->
child_spec(?config(backend, C), C).
-spec child_spec(atom(), config()) ->
supervisor:child_spec().
child_spec(machinery_mg_backend, _C) ->
BackendConfig = #{
path => <<"/v1/stateproc">>,
backend_config => #{
schema => machinery_mg_schema_generic
}
},
Handler = {?MODULE, BackendConfig},
Routes = machinery_mg_backend:get_routes(
[Handler],
#{event_handler => woody_event_handler_default}
),
ServerConfig = #{
ip => {0, 0, 0, 0},
port => 8022
},
machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig).
-spec get_backend(config()) ->
machinery_mg_backend:backend().
get_backend(C) ->
get_backend(?config(backend, C), C).
-spec get_backend(atom(), config()) ->
machinery_mg_backend:backend().
get_backend(machinery_mg_backend, C) ->
machinery_mg_backend:new(
ct_helper:get_woody_ctx(C),
#{
client => #{
url => <<"http://machinegun:8022/v1/automaton">>,
event_handler => woody_event_handler_default
},
schema => machinery_mg_schema_generic
}
).