TD-333: Support for notifications (#10)

This commit is contained in:
Alexey S 2022-08-24 17:07:39 +03:00 committed by GitHub
parent 4c0382f6cb
commit dc899e245b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 331 additions and 8 deletions

View File

@ -17,7 +17,7 @@ services:
command: /sbin/init
machinegun:
image: ghcr.io/valitydev/machinegun:sha-7f0a21a
image: ghcr.io/valitydev/machinegun:sha-00fe6d6
command: /opt/machinegun/bin/machinegun foreground
volumes:
- ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml

View File

@ -28,7 +28,7 @@
{deps, [
{genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}},
{woody, {git, "https://github.com/valitydev/woody_erlang.git", {branch, "master"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto", {branch, "master"}}}
]}.
{xref_checks, [

View File

@ -12,8 +12,8 @@
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mg_proto">>,
{git,"https://github.com/valitydev/machinegun-proto.git",
{ref,"a411c7d5d779389c70d2594eb4a28a916dce1721"}},
{git,"https://github.com/valitydev/machinegun-proto",
{ref,"96f7f11b184c29d8b7e83cd7646f3f2c13662bda"}},
0},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2},

View File

@ -29,7 +29,7 @@
-type limit() :: undefined | non_neg_integer().
-type direction() :: forward | backward.
-type range() :: {event_cursor(), limit(), direction()}.
-type signal(T) :: {init, args(T)} | timeout.
-type signal(T) :: {init, args(T)} | timeout | {notification, args(T)}.
-type machine(E, A) :: #{
namespace := namespace(),
id := id(),
@ -82,6 +82,8 @@
-export([repair/5]).
-export([get/3]).
-export([get/4]).
-export([notify/4]).
-export([notify/5]).
%% Internal API
-export([dispatch_signal/4]).
@ -123,6 +125,8 @@
-callback process_call(args(_), machine(E, A), handler_args(_), handler_opts(_)) -> {response(_), result(E, A)}.
-callback process_notification(args(_), machine(E, A), handler_args(_), handler_opts(_)) -> result(E, A).
%% API
-spec start(namespace(), id(), args(_), backend(_)) -> ok | {error, exists}.
@ -159,13 +163,24 @@ get(NS, ID, Range, Backend) ->
{Module, Opts} = machinery_utils:get_backend(Backend),
machinery_backend:get(Module, NS, ID, Range, Opts).
-spec notify(namespace(), id(), args(_), backend(_)) -> ok | {error, notfound} | no_return().
notify(NS, ID, Args, Backend) ->
notify(NS, ID, {undefined, undefined, forward}, Args, Backend).
-spec notify(namespace(), id(), range(), args(_), backend(_)) -> ok | {error, notfound} | no_return().
notify(NS, ID, Range, Args, Backend) ->
{Module, Opts} = machinery_utils:get_backend(Backend),
machinery_backend:notify(Module, NS, ID, Range, Args, Opts).
%% Internal API
-spec dispatch_signal(signal(_), machine(E, A), logic_handler(_), handler_opts(_)) -> result(E, A).
dispatch_signal({init, Args}, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:init(Args, Machine, HandlerArgs, Opts);
dispatch_signal(timeout, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_timeout(Machine, HandlerArgs, Opts).
Handler:process_timeout(Machine, HandlerArgs, Opts);
dispatch_signal({notification, Args}, Machine, {Handler, HandlerArgs}, Opts) ->
Handler:process_notification(Args, Machine, HandlerArgs, Opts).
-spec dispatch_call(args(_), machine(E, A), logic_handler(_), handler_opts(_)) -> {response(_), result(E, A)}.
dispatch_call(Args, Machine, {Handler, HandlerArgs}, Opts) ->

View File

@ -8,6 +8,7 @@
-export([call/6]).
-export([repair/6]).
-export([get/5]).
-export([notify/6]).
%% Behaviour definition
@ -26,6 +27,8 @@
-callback get(namespace(), id(), range(), backend_opts()) -> {ok, machinery:machine(_, _)} | {error, notfound}.
-callback notify(namespace(), id(), range(), args(), backend_opts()) -> ok | {error, notfound} | no_return().
%% API
-type backend() :: module().
@ -47,3 +50,7 @@ repair(Backend, Namespace, Id, Range, Args, Opts) ->
-spec get(backend(), namespace(), id(), range(), backend_opts()) -> {ok, machinery:machine(_, _)} | {error, notfound}.
get(Backend, Namespace, Id, Range, Opts) ->
Backend:get(Namespace, Id, Range, Opts).
-spec notify(backend(), namespace(), id(), range(), args(), backend_opts()) -> ok | {error, notfound} | no_return().
notify(Backend, Namespace, Id, Range, Args, Opts) ->
Backend:notify(Namespace, Id, Range, Args, Opts).

View File

@ -25,6 +25,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%%
@ -124,10 +125,14 @@ process_call({untag, ID}, Machine, _, _Opts) ->
{{error, IDWas}, #{}}
end.
-spec process_repair({untag, id()}, machine(), undefined, handler_opts()) -> no_return().
-spec process_repair(machinery:args(_), machine(), undefined, handler_opts()) -> no_return().
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, repair}).
-spec process_notification(machinery:args(_), machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%%
get_machine_st(#{history := History}) ->

View File

@ -81,6 +81,7 @@
-export([call/5]).
-export([repair/5]).
-export([get/4]).
-export([notify/5]).
%% Woody handler
-behaviour(woody_server_thrift_handler).
@ -185,6 +186,23 @@ get(NS, Id, Range, Opts) ->
error({namespace_not_found, NS})
end.
-spec notify(namespace(), id(), range(), args(_), backend_opts()) -> ok | {error, notfound} | no_return().
notify(NS, Id, Range, Args, Opts) ->
Client = get_client(Opts),
Schema = get_schema(Opts),
SContext0 = build_schema_context(NS, Id),
Descriptor = {NS, Id, Range},
{NotificationArgs, _SContext1} = marshal({schema, Schema, {args, notification}, SContext0}, Args),
case machinery_mg_client:notify(marshal(descriptor, Descriptor), NotificationArgs, Client) of
{ok, _Response0} ->
%% Response contains the notification id but it's not like we can do anything with that information
ok;
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_NamespaceNotFound{}} ->
error({namespace_not_found, NS})
end.
%% Woody handler
-spec handle_function
@ -424,6 +442,9 @@ unmarshal(
unmarshal({signal, Schema, Context0}, {init, #mg_stateproc_InitSignal{arg = Args0}}) ->
{Args1, Context1} = unmarshal({schema, Schema, {args, init}, Context0}, Args0),
{{init, Args1}, Context1};
unmarshal({signal, Schema, Context0}, {notification, #mg_stateproc_NotificationSignal{arg = Args0}}) ->
{Args1, Context1} = unmarshal({schema, Schema, {args, notification}, Context0}, Args0),
{{notification, Args1}, Context1};
unmarshal({signal, _Schema, Context}, {timeout, #mg_stateproc_TimeoutSignal{}}) ->
{timeout, Context};
unmarshal({list, T}, V) when is_list(V) ->

View File

@ -14,6 +14,7 @@
-export([repair/3]).
-export([get_machine/2]).
-export([modernize/2]).
-export([notify/3]).
-type woody_client() :: #{
url := woody:url(),
@ -40,6 +41,7 @@ new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) ->
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type call_response() :: mg_proto_state_processing_thrift:'CallResponse'().
-type repair_response() :: mg_proto_state_processing_thrift:'RepairResponse'().
-type notify_response() :: mg_proto_state_processing_thrift:'NotifyResponse'().
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type namespace_not_found() :: mg_proto_state_processing_thrift:'NamespaceNotFound'().
-type machine_not_found() :: mg_proto_state_processing_thrift:'MachineNotFound'().
@ -73,6 +75,10 @@ new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) ->
namespace_not_found()
| machine_not_found().
-type notify_errors() ::
namespace_not_found()
| machine_not_found().
-spec start(namespace(), id(), args(), client()) ->
{ok, ok}
| {exception, start_errors()}.
@ -103,6 +109,12 @@ get_machine(Descriptor, Client) ->
modernize(Descriptor, Client) ->
issue_call('Modernize', [Descriptor], Client).
-spec notify(descriptor(), args(), client()) ->
{ok, notify_response()}
| {exception, notify_errors()}.
notify(Descriptor, Args, Client) ->
issue_call('Notify', [Descriptor, Args], Client).
%% Internal functions
issue_call(Function, Args, {WoodyClient, WoodyCtx}) when is_list(Args) ->

View File

@ -11,7 +11,8 @@
{args,
init
| repair
| call}
| call
| notification}
| {response,
call
| {repair,

View File

@ -14,6 +14,8 @@ namespaces:
current_format_version: 1
handler:
url: http://machinery:8022/v1/modernizer
notification:
capacity: 1000
storage:
type: memory

View File

@ -29,6 +29,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Internal types
@ -161,6 +162,10 @@ process_call(fail, _Machine, _, _Opts) ->
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
-spec process_notification(_, machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%% Helpers
start(ID, Args, C) ->

View File

@ -25,6 +25,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
-behaviour(machinery_mg_schema).
@ -143,6 +144,10 @@ process_call(_Args, _Machine, _, _Opts) ->
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
-spec process_notification(_, machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%% machinery_mg_schema callbacks
-spec marshal(machinery_mg_schema:t(), any(), machinery_mg_schema:context()) ->

View File

@ -22,6 +22,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% machinery_mg_schema callbacks
@ -118,6 +119,10 @@ process_call(fail, _Machine, _, _Opts) ->
process_repair(repair_something, #{history := History}, _, _Opts) ->
{ok, {done, #{events => [{count_events, erlang:length(History)}]}}}.
-spec process_notification(_, machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%% machinery_mg_schema callbacks
-spec marshal(machinery_mg_schema:t(), any(), machinery_mg_schema:context()) ->

View File

@ -0,0 +1,230 @@
-module(machinery_notify_SUITE).
-behaviour(machinery).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
%% Common Tests callbacks
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
%% Tests
-export([ordinary_notify_test/1]).
-export([unknown_id_notify_test/1]).
-export([unknown_namespace_notify_test/1]).
-export([ranged_notify_test/1]).
%% Machinery callbacks
-export([init/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Internal types
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
-spec all() -> [test_case_name() | {group, group_name()}].
all() ->
[
{group, machinery_mg_backend}
].
-spec groups() -> [{group_name(), list(), [test_case_name() | {group, group_name()}]}].
groups() ->
[
{machinery_mg_backend, [], [{group, all}]},
{all, [sequence], [
ordinary_notify_test,
unknown_id_notify_test,
unknown_namespace_notify_test,
ranged_notify_test
]}
].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
{StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]),
[{started_apps, StartedApps} | C].
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_helper:stop_apps(?config(started_apps, C)),
ok.
-spec init_per_group(group_name(), config()) -> config().
init_per_group(machinery_mg_backend = Name, C0) ->
C1 = [{backend, Name}, {group_sup, ct_sup:start()} | C0],
{ok, _Pid} = start_backend(C1),
C1;
init_per_group(_Name, C) ->
C.
-spec end_per_group(group_name(), config()) -> config().
end_per_group(machinery_mg_backend, C) ->
ok = ct_sup:stop(?config(group_sup, C)),
C;
end_per_group(_Name, C) ->
C.
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(TestCaseName, C) ->
ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C).
%% Tests
-spec ordinary_notify_test(config()) -> test_return().
ordinary_notify_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertEqual(ok, notify(ID, do_something, C)),
_ = timer:sleep(1000),
{ok, #{history := History}} = get(ID, C),
?assertMatch([{_, _, something} | _], lists:reverse(History)).
-spec unknown_id_notify_test(config()) -> test_return().
unknown_id_notify_test(C) ->
ID = unique(),
?assertEqual({error, notfound}, notify(ID, do_something, C)).
-spec unknown_namespace_notify_test(config()) -> test_return().
unknown_namespace_notify_test(C) ->
ID = unique(),
?assertError({namespace_not_found, mmm}, machinery:notify(mmm, ID, do_something, get_backend(C))).
-spec ranged_notify_test(config()) -> test_return().
ranged_notify_test(C) ->
ID = unique(),
?assertEqual(ok, start(ID, init_numbers, C)),
?assertEqual(ok, notify(ID, sum_numbers, {10, 9, backward}, C)),
_ = timer:sleep(1000),
{ok, #{history := History1}} = get(ID, C),
?assertMatch([{_, _, {sum, 45}} | _], lists:reverse(History1)),
?assertEqual(ok, notify(ID, sum_numbers, {2, 9, forward}, C)),
_ = timer:sleep(1000),
{ok, #{history := History2}} = get(ID, C),
?assertMatch([{_, _, {sum, 63}} | _], lists:reverse(History2)).
%% Machinery handler
-type event() :: any().
-type aux_st() :: any().
-type machine() :: machinery:machine(event(), aux_st()).
-type handler_opts() :: machinery:handler_opts(_).
-type result() :: machinery:result(event(), aux_st()).
-spec init(_Args, machine(), undefined, handler_opts()) -> result().
init(init_numbers, _Machine, _, _Opts) ->
#{
events => lists:seq(1, 100)
}.
-spec process_timeout(machine(), undefined, handler_opts()) -> no_return().
process_timeout(#{}, _, _Opts) ->
erlang:error({not_implemented, process_timeout}).
-spec process_call(_Args, machine(), undefined, handler_opts()) -> no_return().
process_call(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_call}).
-spec process_repair(_Args, machine(), undefined, handler_opts()) -> no_return().
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
-spec process_notification(_Args, machine(), undefined, handler_opts()) -> result().
process_notification(do_something, _Machine, _, _Opts) ->
#{
events => [something]
};
process_notification(sum_numbers, #{history := History}, _, _Opts) ->
EventsSum = lists:foldr(
fun
({_, _, Num}, Acc) when is_number(Num) ->
Num + Acc;
({_, _, _}, Acc) ->
Acc
end,
0,
History
),
#{
events => [{sum, EventsSum}]
}.
%% Helpers
start(ID, Args, C) ->
machinery:start(namespace(), ID, Args, get_backend(C)).
notify(ID, Args, C) ->
machinery:notify(namespace(), ID, Args, get_backend(C)).
notify(ID, Args, Range, C) ->
machinery:notify(namespace(), ID, Range, Args, get_backend(C)).
get(ID, C) ->
machinery:get(namespace(), ID, get_backend(C)).
namespace() ->
general.
unique() ->
genlib:unique().
start_backend(C) ->
{ok, _PID} = supervisor:start_child(
?config(group_sup, C),
child_spec(C)
).
-spec child_spec(config()) -> supervisor:child_spec().
child_spec(C) ->
child_spec(?config(backend, C), C).
-spec child_spec(atom(), config()) -> supervisor:child_spec().
child_spec(machinery_mg_backend, _C) ->
BackendConfig = #{
path => <<"/v1/stateproc">>,
backend_config => #{
schema => machinery_mg_schema_generic
}
},
Handler = {?MODULE, BackendConfig},
Routes = machinery_mg_backend:get_routes(
[Handler],
#{event_handler => woody_event_handler_default}
),
ServerConfig = #{
ip => {0, 0, 0, 0},
port => 8022
},
machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig).
-spec get_backend(config()) -> machinery_mg_backend:backend().
get_backend(C) ->
get_backend(?config(backend, C), C).
-spec get_backend(atom(), config()) -> machinery_mg_backend:backend().
get_backend(machinery_mg_backend, C) ->
machinery_mg_backend:new(
ct_helper:get_woody_ctx(C),
#{
client => #{
url => <<"http://machinegun:8022/v1/automaton">>,
event_handler => woody_event_handler_default
},
schema => machinery_mg_schema_generic
}
).

View File

@ -31,6 +31,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Internal types
@ -189,6 +190,10 @@ process_repair(fail, _Machine, _, _Opts) ->
process_repair(unexpected_fail, _Machine, _, _Opts) ->
erlang:error(unexpected_fail).
-spec process_notification(_, machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%% Helpers
start(ID, Args, C) ->

View File

@ -27,6 +27,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Internal types
@ -137,6 +138,10 @@ process_call(_Args, _Machine, _, _Opts) ->
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
-spec process_notification(_, machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%% Helpers
start(ID, Args, C) ->

View File

@ -29,6 +29,7 @@
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_call/4]).
-export([process_notification/4]).
%% Internal types
@ -198,6 +199,10 @@ process_call(timer_with_range, _Machine, _, _Opts) ->
process_repair(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_repair}).
-spec process_notification(_, machine(), undefined, handler_opts()) -> no_return().
process_notification(_Args, _Machine, _, _Opts) ->
erlang:error({not_implemented, process_notification}).
%% Helpers
start(ID, Args, C) ->