mirror of
https://github.com/valitydev/machinery-erlang.git
synced 2024-11-06 00:35:19 +00:00
MG-183: Modernizer support (#28)
This commit is contained in:
parent
a85d56d1d5
commit
9d7e4b28cd
@ -15,7 +15,7 @@ services:
|
||||
condition: service_healthy
|
||||
|
||||
machinegun:
|
||||
image: dr2.rbkmoney.com/rbkmoney/machinegun:9b160a5f39fa54b1a20ca9cc8a9a881cbcc9ed4f
|
||||
image: dr2.rbkmoney.com/rbkmoney/machinegun:c89c2cecc6fa72895492c718f042a7ec90fd2442
|
||||
command: /opt/machinegun/bin/machinegun foreground
|
||||
volumes:
|
||||
- ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml
|
||||
|
@ -282,23 +282,9 @@ build_schema_context(NS, Ref) ->
|
||||
|
||||
%% Marshalling
|
||||
|
||||
marshal(descriptor, {NS, Ref, Range}) ->
|
||||
#mg_stateproc_MachineDescriptor{
|
||||
'ns' = marshal(namespace, NS),
|
||||
'ref' = marshal(ref, Ref),
|
||||
'range' = marshal(range, Range)
|
||||
};
|
||||
|
||||
marshal(range, {Cursor, Limit, Direction}) ->
|
||||
#mg_stateproc_HistoryRange{
|
||||
'after' = marshal({maybe, event_id}, Cursor),
|
||||
'limit' = marshal(limit, Limit),
|
||||
'direction' = marshal(direction, Direction)
|
||||
};
|
||||
|
||||
marshal({signal_result, Schema, Context}, #{} = V) ->
|
||||
#mg_stateproc_SignalResult{
|
||||
change = marshal({state_change, Schema, Context}, V),
|
||||
change = marshal({state_change, Schema, Context}, V),
|
||||
action = marshal(action, maps:get(action, V, []))
|
||||
};
|
||||
|
||||
@ -365,62 +351,11 @@ marshal(timer, {timeout, V}) ->
|
||||
marshal(timer, {deadline, V}) ->
|
||||
{deadline, marshal(timestamp, V)};
|
||||
|
||||
marshal(namespace, V) ->
|
||||
marshal(atom, V);
|
||||
|
||||
marshal(ref, V) when is_binary(V) ->
|
||||
{id, marshal(id, V)};
|
||||
|
||||
marshal(ref, {tag, V}) ->
|
||||
{tag, marshal(tag, V)};
|
||||
|
||||
marshal(id, V) ->
|
||||
marshal(string, V);
|
||||
|
||||
marshal(tag, V) ->
|
||||
marshal(string, V);
|
||||
|
||||
marshal(event_id, V) ->
|
||||
marshal(integer, V);
|
||||
|
||||
marshal(limit, V) ->
|
||||
marshal({maybe, integer}, V);
|
||||
|
||||
marshal(direction, V) ->
|
||||
marshal({enum, [forward, backward]}, V);
|
||||
|
||||
marshal({schema, Schema, T, Context}, V) ->
|
||||
machinery_mg_schema:marshal(Schema, T, V, Context);
|
||||
|
||||
marshal(timestamp, {DateTime, USec}) ->
|
||||
Ts = genlib_time:daytime_to_unixtime(DateTime) * ?MICROS_PER_SEC + USec,
|
||||
Str = calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}]),
|
||||
erlang:list_to_binary(Str);
|
||||
|
||||
marshal({list, T}, V) when is_list(V) ->
|
||||
[marshal(T, E) || E <- V];
|
||||
|
||||
marshal({maybe, _}, undefined) ->
|
||||
undefined;
|
||||
|
||||
marshal({maybe, T}, V) ->
|
||||
marshal(T, V);
|
||||
|
||||
marshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) ->
|
||||
_ = lists:member(V, Choices) orelse erlang:error(badarg, [T, V]),
|
||||
V;
|
||||
|
||||
marshal(atom, V) when is_atom(V) ->
|
||||
atom_to_binary(V, utf8);
|
||||
|
||||
marshal(string, V) when is_binary(V) ->
|
||||
V;
|
||||
|
||||
marshal(integer, V) when is_integer(V) ->
|
||||
V;
|
||||
|
||||
marshal(T, V) ->
|
||||
erlang:error(badarg, [T, V]).
|
||||
machinery_mg_codec:marshal(T, V).
|
||||
|
||||
apply_action({set_timer, V}, CA) ->
|
||||
CA#mg_stateproc_ComplexAction{
|
||||
@ -466,29 +401,6 @@ apply_action({tag, Tag}, CA) ->
|
||||
}
|
||||
}.
|
||||
|
||||
%%
|
||||
%% No unmarshalling for the decriptor required by the protocol so far.
|
||||
%%
|
||||
%% unmarshal(
|
||||
%% descriptor,
|
||||
%% #mg_stateproc_MachineDescriptor{
|
||||
%% ns = NS,
|
||||
%% ref = {'id', ID},
|
||||
%% range = Range
|
||||
%% }
|
||||
%% ) ->
|
||||
%% {unmarshal(namespace, NS), unmarshal(id, ID), unmarshal(range, Range)};
|
||||
|
||||
unmarshal(
|
||||
range,
|
||||
#mg_stateproc_HistoryRange{
|
||||
'after' = Cursor,
|
||||
'limit' = Limit,
|
||||
'direction' = Direction
|
||||
}
|
||||
) ->
|
||||
{unmarshal({maybe, event_id}, Cursor), unmarshal(limit, Limit), unmarshal(direction, Direction)};
|
||||
|
||||
unmarshal(
|
||||
{machine, Schema},
|
||||
#mg_stateproc_Machine{
|
||||
@ -545,78 +457,8 @@ unmarshal({signal, Schema, Context0}, {repair, #mg_stateproc_RepairSignal{arg =
|
||||
{Args1, Context1} = unmarshal({schema, Schema, {args, repair}, Context0}, Args0),
|
||||
{{repair, Args1}, Context1};
|
||||
|
||||
unmarshal(namespace, V) ->
|
||||
unmarshal(atom, V);
|
||||
|
||||
unmarshal(id, V) ->
|
||||
unmarshal(string, V);
|
||||
|
||||
unmarshal(event_id, V) ->
|
||||
unmarshal(integer, V);
|
||||
|
||||
unmarshal(limit, V) ->
|
||||
unmarshal({maybe, integer}, V);
|
||||
|
||||
unmarshal(direction, V) ->
|
||||
unmarshal({enum, [forward, backward]}, V);
|
||||
|
||||
unmarshal({schema, Schema, T, Context}, V) ->
|
||||
machinery_mg_schema:unmarshal(Schema, T, V, Context);
|
||||
|
||||
unmarshal(timestamp, V) when is_binary(V) ->
|
||||
ok = assert_is_utc(V),
|
||||
Str = erlang:binary_to_list(V),
|
||||
try
|
||||
Micros = calendar:rfc3339_to_system_time(Str, [{unit, microsecond}]),
|
||||
Datetime = calendar:system_time_to_universal_time(Micros, microsecond),
|
||||
{Datetime, Micros rem ?MICROS_PER_SEC}
|
||||
catch
|
||||
error:Reason ->
|
||||
erlang:error(badarg, [timestamp, V, Reason])
|
||||
end;
|
||||
|
||||
unmarshal({list, T}, V) when is_list(V) ->
|
||||
[unmarshal(T, E) || E <- V];
|
||||
|
||||
unmarshal({maybe, _}, undefined) ->
|
||||
undefined;
|
||||
|
||||
unmarshal({maybe, T}, V) ->
|
||||
unmarshal(T, V);
|
||||
|
||||
unmarshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) ->
|
||||
case lists:member(V, Choices) of
|
||||
true ->
|
||||
V;
|
||||
false ->
|
||||
erlang:error(badarg, [T, V])
|
||||
end;
|
||||
|
||||
unmarshal(atom, V) when is_binary(V) ->
|
||||
binary_to_existing_atom(V, utf8);
|
||||
|
||||
unmarshal(string, V) when is_binary(V) ->
|
||||
V;
|
||||
|
||||
unmarshal(integer, V) when is_integer(V) ->
|
||||
V;
|
||||
|
||||
unmarshal(T, V) ->
|
||||
erlang:error(badarg, [T, V]).
|
||||
|
||||
-spec assert_is_utc(binary()) ->
|
||||
ok | no_return().
|
||||
assert_is_utc(Rfc3339) ->
|
||||
Size0 = erlang:byte_size(Rfc3339),
|
||||
Size1 = Size0 - 1,
|
||||
Size6 = Size0 - 6,
|
||||
case Rfc3339 of
|
||||
<<_:Size1/bytes, "Z">> ->
|
||||
ok;
|
||||
<<_:Size6/bytes, "+00:00">> ->
|
||||
ok;
|
||||
<<_:Size6/bytes, "-00:00">> ->
|
||||
ok;
|
||||
_ ->
|
||||
erlang:error(badarg, [timestamp, Rfc3339, badoffset])
|
||||
end.
|
||||
machinery_mg_codec:unmarshal(T, V).
|
||||
|
@ -12,6 +12,7 @@
|
||||
-export([call/3]).
|
||||
-export([repair/3]).
|
||||
-export([get_machine/2]).
|
||||
-export([modernize/2]).
|
||||
|
||||
-type woody_client() :: #{
|
||||
url := woody:url(),
|
||||
@ -68,6 +69,10 @@ new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) ->
|
||||
namespace_not_found() |
|
||||
machine_not_found().
|
||||
|
||||
-type modernize_errors() ::
|
||||
namespace_not_found() |
|
||||
machine_not_found().
|
||||
|
||||
-spec start(namespace(), id(), args(), client()) ->
|
||||
{ok, ok} |
|
||||
{exception, start_errors()}.
|
||||
@ -92,6 +97,12 @@ repair(Descriptor, Args, Client) ->
|
||||
get_machine(Descriptor, Client) ->
|
||||
issue_call('GetMachine', [Descriptor], Client).
|
||||
|
||||
-spec modernize(descriptor(), client()) ->
|
||||
{ok, ok} |
|
||||
{exception, modernize_errors()}.
|
||||
modernize(Descriptor, Client) ->
|
||||
issue_call('Modernize', [Descriptor], Client).
|
||||
|
||||
%% Internal functions
|
||||
|
||||
issue_call(Function, Args, {WoodyClient, WoodyCtx}) ->
|
||||
|
196
src/machinery_mg_codec.erl
Normal file
196
src/machinery_mg_codec.erl
Normal file
@ -0,0 +1,196 @@
|
||||
-module(machinery_mg_codec).
|
||||
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
|
||||
|
||||
-export([marshal/2]).
|
||||
-export([unmarshal/2]).
|
||||
|
||||
-define(MICROS_PER_SEC, (1000 * 1000)).
|
||||
|
||||
%%
|
||||
|
||||
-type type() :: atom() | {enum, list(atom())} | {schema, module(), any(), any()}.
|
||||
|
||||
-type encoded_value() :: encoded_value(any()).
|
||||
-type encoded_value(T) :: T.
|
||||
|
||||
-type decoded_value() :: decoded_value(any()).
|
||||
-type decoded_value(T) :: T.
|
||||
|
||||
-spec marshal(type(), decoded_value()) ->
|
||||
encoded_value().
|
||||
|
||||
marshal(id, V) ->
|
||||
marshal(string, V);
|
||||
|
||||
marshal(event_id, V) ->
|
||||
marshal(integer, V);
|
||||
|
||||
marshal(namespace, V) ->
|
||||
marshal(atom, V);
|
||||
|
||||
marshal(tag, V) ->
|
||||
marshal(string, V);
|
||||
|
||||
marshal(ref, V) when is_binary(V) ->
|
||||
{id, marshal(id, V)};
|
||||
|
||||
marshal(ref, {tag, V}) ->
|
||||
{tag, marshal(tag, V)};
|
||||
|
||||
marshal(descriptor, {NS, Ref, Range}) ->
|
||||
#mg_stateproc_MachineDescriptor{
|
||||
'ns' = marshal(namespace, NS),
|
||||
'ref' = marshal(ref, Ref),
|
||||
'range' = marshal(range, Range)
|
||||
};
|
||||
|
||||
marshal(range, {Cursor, Limit, Direction}) ->
|
||||
#mg_stateproc_HistoryRange{
|
||||
'after' = maybe_marshal(event_id, Cursor),
|
||||
'limit' = maybe_marshal(limit, Limit),
|
||||
'direction' = marshal(direction, Direction)
|
||||
};
|
||||
|
||||
marshal(limit, V) ->
|
||||
marshal(integer, V);
|
||||
|
||||
marshal(direction, V) ->
|
||||
marshal({enum, [forward, backward]}, V);
|
||||
|
||||
marshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) ->
|
||||
_ = lists:member(V, Choices) orelse erlang:error(badarg, [T, V]),
|
||||
V;
|
||||
|
||||
marshal({schema, Schema, T, Context}, V) ->
|
||||
machinery_mg_schema:marshal(Schema, T, V, Context);
|
||||
|
||||
marshal(timestamp, {DateTime, USec}) ->
|
||||
Ts = genlib_time:daytime_to_unixtime(DateTime) * ?MICROS_PER_SEC + USec,
|
||||
Str = calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}]),
|
||||
erlang:list_to_binary(Str);
|
||||
|
||||
marshal(atom, V) when is_atom(V) ->
|
||||
atom_to_binary(V, utf8);
|
||||
|
||||
marshal(string, V) when is_binary(V) ->
|
||||
V;
|
||||
|
||||
marshal(integer, V) when is_integer(V) ->
|
||||
V;
|
||||
|
||||
marshal(T, V) ->
|
||||
erlang:error(badarg, [T, V]).
|
||||
|
||||
%%
|
||||
|
||||
-spec unmarshal(type(), encoded_value()) ->
|
||||
decoded_value().
|
||||
|
||||
unmarshal(id, V) ->
|
||||
unmarshal(string, V);
|
||||
|
||||
unmarshal(event_id, V) ->
|
||||
unmarshal(integer, V);
|
||||
|
||||
unmarshal(namespace, V) ->
|
||||
unmarshal(atom, V);
|
||||
|
||||
unmarshal(tag, V) ->
|
||||
unmarshal(string, V);
|
||||
|
||||
%%
|
||||
%% No unmarshalling for the decriptor required by the protocol so far.
|
||||
%%
|
||||
%% unmarshal(
|
||||
%% descriptor,
|
||||
%% #mg_stateproc_MachineDescriptor{
|
||||
%% ns = NS,
|
||||
%% ref = {'id', ID},
|
||||
%% range = Range
|
||||
%% }
|
||||
%% ) ->
|
||||
%% {unmarshal(namespace, NS), unmarshal(id, ID), unmarshal(range, Range)};
|
||||
|
||||
unmarshal(
|
||||
range,
|
||||
#mg_stateproc_HistoryRange{
|
||||
'after' = Cursor,
|
||||
'limit' = Limit,
|
||||
'direction' = Direction
|
||||
}
|
||||
) ->
|
||||
{
|
||||
maybe_unmarshal(event_id, Cursor),
|
||||
maybe_unmarshal(limit, Limit),
|
||||
unmarshal(direction, Direction)
|
||||
};
|
||||
|
||||
unmarshal(limit, V) ->
|
||||
unmarshal(integer, V);
|
||||
|
||||
unmarshal(direction, V) ->
|
||||
unmarshal({enum, [forward, backward]}, V);
|
||||
|
||||
unmarshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) ->
|
||||
case lists:member(V, Choices) of
|
||||
true ->
|
||||
V;
|
||||
false ->
|
||||
erlang:error(badarg, [T, V])
|
||||
end;
|
||||
|
||||
unmarshal({schema, Schema, T, Context}, V) ->
|
||||
machinery_mg_schema:unmarshal(Schema, T, V, Context);
|
||||
|
||||
unmarshal(timestamp, V) when is_binary(V) ->
|
||||
ok = assert_is_utc(V),
|
||||
Str = erlang:binary_to_list(V),
|
||||
try
|
||||
Micros = calendar:rfc3339_to_system_time(Str, [{unit, microsecond}]),
|
||||
Datetime = calendar:system_time_to_universal_time(Micros, microsecond),
|
||||
{Datetime, Micros rem ?MICROS_PER_SEC}
|
||||
catch
|
||||
error:Reason ->
|
||||
erlang:error(badarg, [timestamp, V, Reason])
|
||||
end;
|
||||
|
||||
unmarshal(atom, V) when is_binary(V) ->
|
||||
binary_to_existing_atom(V, utf8);
|
||||
|
||||
unmarshal(string, V) when is_binary(V) ->
|
||||
V;
|
||||
|
||||
unmarshal(integer, V) when is_integer(V) ->
|
||||
V;
|
||||
|
||||
unmarshal(T, V) ->
|
||||
erlang:error(badarg, [T, V]).
|
||||
|
||||
%%
|
||||
|
||||
maybe_unmarshal(_Type, undefined) ->
|
||||
undefined;
|
||||
maybe_unmarshal(Type, Value) ->
|
||||
unmarshal(Type, Value).
|
||||
|
||||
maybe_marshal(_Type, undefined) ->
|
||||
undefined;
|
||||
maybe_marshal(Type, Value) ->
|
||||
marshal(Type, Value).
|
||||
|
||||
-spec assert_is_utc(binary()) ->
|
||||
ok | no_return().
|
||||
assert_is_utc(Rfc3339) ->
|
||||
Size0 = erlang:byte_size(Rfc3339),
|
||||
Size1 = Size0 - 1,
|
||||
Size6 = Size0 - 6,
|
||||
case Rfc3339 of
|
||||
<<_:Size1/bytes, "Z">> ->
|
||||
ok;
|
||||
<<_:Size6/bytes, "+00:00">> ->
|
||||
ok;
|
||||
<<_:Size6/bytes, "-00:00">> ->
|
||||
ok;
|
||||
_ ->
|
||||
erlang:error(badarg, [timestamp, Rfc3339, badoffset])
|
||||
end.
|
28
src/machinery_modernizer.erl
Normal file
28
src/machinery_modernizer.erl
Normal file
@ -0,0 +1,28 @@
|
||||
%%%
|
||||
%%% Modernizer API abstraction.
|
||||
%%% Behaviour and API.
|
||||
%%%
|
||||
|
||||
-module(machinery_modernizer).
|
||||
|
||||
% API
|
||||
-type namespace() :: machinery:namespace().
|
||||
-type ref() :: machinery:ref().
|
||||
-type range() :: machinery:range().
|
||||
-type backend() :: machinery:backend(_).
|
||||
|
||||
-export([modernize/3]).
|
||||
-export([modernize/4]).
|
||||
|
||||
%% API
|
||||
|
||||
-spec modernize(namespace(), ref(), backend()) ->
|
||||
ok| {error, notfound}.
|
||||
modernize(NS, Ref, Backend) ->
|
||||
modernize(NS, Ref, {undefined, undefined, forward}, Backend).
|
||||
|
||||
-spec modernize(namespace(), ref(), range(), backend()) ->
|
||||
ok | {error, notfound}.
|
||||
modernize(NS, Ref, Range, Backend) ->
|
||||
{Module, Opts} = machinery_utils:get_backend(Backend),
|
||||
machinery_modernizer_backend:modernize(Module, NS, Ref, Range, Opts).
|
28
src/machinery_modernizer_backend.erl
Normal file
28
src/machinery_modernizer_backend.erl
Normal file
@ -0,0 +1,28 @@
|
||||
%%%
|
||||
%%% Modernizer backend behaviour.
|
||||
%%%
|
||||
|
||||
-module(machinery_modernizer_backend).
|
||||
|
||||
%% API
|
||||
-export([modernize/5]).
|
||||
|
||||
%% Behaviour definition
|
||||
|
||||
-type namespace() :: machinery:namespace().
|
||||
-type id() :: machinery:id().
|
||||
-type ref() :: machinery:ref().
|
||||
-type range() :: machinery:range().
|
||||
-type backend_opts() :: machinery:backend_opts(_).
|
||||
|
||||
-callback modernize(namespace(), id(), range(), backend_opts()) ->
|
||||
ok | {error, notfound}.
|
||||
|
||||
%% API
|
||||
|
||||
-type backend() :: module().
|
||||
|
||||
-spec modernize(backend(), namespace(), ref(), range(), backend_opts()) ->
|
||||
ok | {error, notfound}.
|
||||
modernize(Backend, Namespace, Ref, Range, Opts) ->
|
||||
Backend:modernize(Namespace, Ref, Range, Opts).
|
194
src/machinery_modernizer_mg_backend.erl
Normal file
194
src/machinery_modernizer_mg_backend.erl
Normal file
@ -0,0 +1,194 @@
|
||||
%%%
|
||||
%%% Modernizer machinegun backend
|
||||
%%%
|
||||
|
||||
-module(machinery_modernizer_mg_backend).
|
||||
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
|
||||
|
||||
-type namespace() :: machinery:namespace().
|
||||
-type ref() :: machinery:ref().
|
||||
-type range() :: machinery:range().
|
||||
-type logic_handler(T) :: machinery:logic_handler(T).
|
||||
|
||||
-define(BACKEND_CORE_OPTS,
|
||||
schema := machinery_mg_schema:schema()
|
||||
).
|
||||
|
||||
%% Server types
|
||||
-type backend_config() :: #{
|
||||
?BACKEND_CORE_OPTS
|
||||
}.
|
||||
|
||||
-type handler_config() :: #{
|
||||
path := woody:path(),
|
||||
backend_config := backend_config()
|
||||
}.
|
||||
|
||||
-type handler( ) :: handler_config(). %% handler server spec
|
||||
|
||||
-type handler_opts() :: machinery:handler_opts(#{
|
||||
woody_ctx := woody_context:ctx()
|
||||
}).
|
||||
|
||||
-type backend_handler_opts() :: #{
|
||||
?BACKEND_CORE_OPTS
|
||||
}.
|
||||
|
||||
%% Client types
|
||||
-type backend_opts() :: machinery:backend_opts(#{
|
||||
woody_ctx := woody_context:ctx(),
|
||||
client := machinery_mg_client:woody_client(),
|
||||
?BACKEND_CORE_OPTS
|
||||
}).
|
||||
|
||||
-type backend() :: {?MODULE, backend_opts()}.
|
||||
|
||||
-type backend_opts_static() :: #{
|
||||
client := machinery_mg_client:woody_client(),
|
||||
?BACKEND_CORE_OPTS
|
||||
}.
|
||||
|
||||
-export_type([backend_config/0]).
|
||||
-export_type([handler_config/0]).
|
||||
-export_type([logic_handler/1]).
|
||||
-export_type([handler/0]).
|
||||
-export_type([handler_opts/0]).
|
||||
-export_type([backend_opts/0]).
|
||||
-export_type([backend/0]).
|
||||
|
||||
%% API
|
||||
-export([new/2]).
|
||||
-export([get_routes/2]).
|
||||
-export([get_handler/2]).
|
||||
|
||||
%% Machinery backend
|
||||
-behaviour(machinery_modernizer_backend).
|
||||
|
||||
-export([modernize/4]).
|
||||
|
||||
%% Woody handler
|
||||
-behaviour(woody_server_thrift_handler).
|
||||
|
||||
-export([handle_function/4]).
|
||||
|
||||
%% API
|
||||
|
||||
-spec get_routes([handler()], machinery_utils:route_opts()) ->
|
||||
machinery_utils:woody_routes().
|
||||
get_routes(Handlers, Opts) ->
|
||||
machinery_utils:get_woody_routes(Handlers, fun get_handler/2, Opts).
|
||||
|
||||
-spec get_handler(handler(), machinery_utils:route_opts()) ->
|
||||
machinery_utils:woody_handler().
|
||||
get_handler(#{path := Path, backend_config := Config}, _) ->
|
||||
{Path, {
|
||||
{mg_proto_state_processing_thrift, 'Modernizer'},
|
||||
{?MODULE, Config}
|
||||
}}.
|
||||
|
||||
-spec new(woody_context:ctx(), backend_opts_static()) ->
|
||||
backend().
|
||||
new(WoodyCtx, Opts = #{client := _, schema := _}) ->
|
||||
{?MODULE, Opts#{woody_ctx => WoodyCtx}}.
|
||||
|
||||
%% Machinery backend
|
||||
|
||||
-spec modernize(namespace(), ref(), range(), backend_opts()) ->
|
||||
ok | {error, notfound}.
|
||||
modernize(NS, Ref, Range, Opts) ->
|
||||
Client = get_client(Opts),
|
||||
Descriptor = {NS, Ref, Range},
|
||||
case machinery_mg_client:modernize(marshal(descriptor, Descriptor), Client) of
|
||||
{ok, ok} ->
|
||||
ok;
|
||||
{exception, #mg_stateproc_MachineNotFound{}} ->
|
||||
{error, notfound};
|
||||
{exception, #mg_stateproc_NamespaceNotFound{}} ->
|
||||
error({namespace_not_found, NS})
|
||||
end.
|
||||
|
||||
%% Woody handler
|
||||
|
||||
-spec handle_function('ModernizeEvent', woody:args(), woody_context:ctx(), backend_handler_opts()) ->
|
||||
{ok, mg_proto_state_processing_thrift:'ModernizeEventResult'()}.
|
||||
handle_function('ModernizeEvent', [MachineEvent0], _WoodyCtx, #{schema := Schema}) ->
|
||||
{MachineEvent, Context} = unmarshal_machine_event(Schema, MachineEvent0),
|
||||
TargetVersion = machinery_mg_schema:get_version(Schema, event),
|
||||
EventPayload = marshal_event_content(Schema, TargetVersion, Context, MachineEvent),
|
||||
{ok, marshal(modernize_event_result, EventPayload)}.
|
||||
|
||||
%% Utils
|
||||
|
||||
|
||||
unmarshal_machine_event(Schema, #mg_stateproc_MachineEvent{
|
||||
ns = MachineNS,
|
||||
id = MachineID,
|
||||
event = Event
|
||||
}) ->
|
||||
ID = unmarshal(id, MachineID),
|
||||
NS = unmarshal(namespace, MachineNS),
|
||||
Context = build_schema_context(NS, ID),
|
||||
{unmarshal({event, Schema, Context}, Event), Context}.
|
||||
|
||||
marshal_event_content(Schema, Version, Context0, _Event = #{data := EventData0}) ->
|
||||
{EventData1, Context0} = marshal({schema, Schema, {event, Version}, Context0}, EventData0),
|
||||
#mg_stateproc_Content{
|
||||
format_version = maybe_marshal(format_version, Version),
|
||||
data = EventData1
|
||||
}.
|
||||
|
||||
get_client(#{client := Client, woody_ctx := WoodyCtx}) ->
|
||||
machinery_mg_client:new(Client, WoodyCtx).
|
||||
|
||||
build_schema_context(NS, Ref) ->
|
||||
#{
|
||||
machine_ns => NS,
|
||||
machine_ref => Ref
|
||||
}.
|
||||
|
||||
%% Marshalling
|
||||
|
||||
marshal(modernize_event_result, Content) ->
|
||||
#mg_stateproc_ModernizeEventResult{
|
||||
event_payload = Content
|
||||
};
|
||||
|
||||
marshal(format_version, V) ->
|
||||
marshal(integer, V);
|
||||
|
||||
marshal(T, V) ->
|
||||
machinery_mg_codec:marshal(T, V).
|
||||
|
||||
%% Unmarshalling
|
||||
|
||||
unmarshal({event, Schema, Context0}, #mg_stateproc_Event{
|
||||
id = EventID,
|
||||
created_at = CreatedAt0,
|
||||
format_version = Version,
|
||||
data = EventData0
|
||||
}) ->
|
||||
CreatedAt1 = unmarshal(timestamp, CreatedAt0),
|
||||
Context1 = Context0#{created_at => CreatedAt1},
|
||||
{EventData1, Context1} = unmarshal({schema, Schema, {event, Version}, Context1}, EventData0),
|
||||
#{
|
||||
id => unmarshal(event_id, EventID),
|
||||
created_at => CreatedAt1,
|
||||
format_version => maybe_unmarshal(format_version, Version),
|
||||
data => EventData1
|
||||
};
|
||||
|
||||
unmarshal(format_version, V) ->
|
||||
unmarshal(integer, V);
|
||||
|
||||
unmarshal(T, V) ->
|
||||
machinery_mg_codec:unmarshal(T, V).
|
||||
|
||||
maybe_unmarshal(_Type, undefined) ->
|
||||
undefined;
|
||||
maybe_unmarshal(Type, Value) ->
|
||||
unmarshal(Type, Value).
|
||||
|
||||
maybe_marshal(_Type, undefined) ->
|
||||
undefined;
|
||||
maybe_marshal(Type, Value) ->
|
||||
marshal(Type, Value).
|
@ -10,6 +10,10 @@ namespaces:
|
||||
general:
|
||||
processor:
|
||||
url: http://machinery:8022/v1/stateproc
|
||||
modernizer:
|
||||
current_format_version: 1
|
||||
handler:
|
||||
url: http://machinery:8022/v1/modernizer
|
||||
|
||||
storage:
|
||||
type: memory
|
||||
|
294
test/machinery_mg_modernizer_flow_SUITE.erl
Normal file
294
test/machinery_mg_modernizer_flow_SUITE.erl
Normal file
@ -0,0 +1,294 @@
|
||||
-module(machinery_mg_modernizer_flow_SUITE).
|
||||
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
%% Common Tests callbacks
|
||||
-export([all/0]).
|
||||
-export([groups/0]).
|
||||
-export([init_per_suite/1]).
|
||||
-export([end_per_suite/1]).
|
||||
-export([init_per_group/2]).
|
||||
-export([end_per_group/2]).
|
||||
-export([init_per_testcase/2]).
|
||||
|
||||
%% Tests
|
||||
|
||||
-export([modernizer_test/1]).
|
||||
-export([skip_upgrading_test/1]).
|
||||
|
||||
%% Machinery callbacks
|
||||
|
||||
-behaviour(machinery).
|
||||
|
||||
-export([init/4]).
|
||||
-export([process_timeout/3]).
|
||||
-export([process_repair/4]).
|
||||
-export([process_call/4]).
|
||||
|
||||
-behaviour(machinery_mg_schema).
|
||||
|
||||
-export([marshal/3]).
|
||||
-export([unmarshal/3]).
|
||||
-export([get_version/1]).
|
||||
|
||||
%% Internal types
|
||||
|
||||
-type config() :: ct_helper:config().
|
||||
-type test_case_name() :: ct_helper:test_case_name().
|
||||
-type group_name() :: ct_helper:group_name().
|
||||
-type test_return() :: _ | no_return().
|
||||
|
||||
-spec all() -> [test_case_name() | {group, group_name()}].
|
||||
all() ->
|
||||
[
|
||||
{group, machinery_mg_backend}
|
||||
].
|
||||
|
||||
-spec groups() ->
|
||||
[{group_name(), list(), test_case_name()}].
|
||||
groups() ->
|
||||
[
|
||||
{machinery_mg_backend, [], [{group, all}]},
|
||||
{all, [], [
|
||||
modernizer_test,
|
||||
skip_upgrading_test
|
||||
]}
|
||||
].
|
||||
|
||||
-spec init_per_suite(config()) -> config().
|
||||
init_per_suite(C) ->
|
||||
{StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]),
|
||||
[{started_apps, StartedApps}| C].
|
||||
|
||||
-spec end_per_suite(config()) -> _.
|
||||
end_per_suite(C) ->
|
||||
ok = ct_helper:stop_apps(?config(started_apps, C)),
|
||||
ok.
|
||||
|
||||
-spec init_per_group(group_name(), config()) -> config().
|
||||
init_per_group(machinery_mg_backend = Name, C0) ->
|
||||
C1 = [
|
||||
{backend, Name},
|
||||
{modernizer_backend, machinery_modernizer_mg_backend},
|
||||
{group_sup, ct_sup:start()} | C0
|
||||
],
|
||||
{ok, _Pid} = start_backend(C1),
|
||||
C1;
|
||||
init_per_group(_Name, C) ->
|
||||
C.
|
||||
|
||||
-spec end_per_group(group_name(), config()) -> config().
|
||||
end_per_group(machinery_mg_backend, C) ->
|
||||
ok = ct_sup:stop(?config(group_sup, C)),
|
||||
C;
|
||||
end_per_group(_Name, C) ->
|
||||
C.
|
||||
|
||||
-spec init_per_testcase(test_case_name(), config()) -> config().
|
||||
init_per_testcase(TestCaseName, C) ->
|
||||
ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C).
|
||||
|
||||
%% Tests
|
||||
|
||||
-spec modernizer_test(config()) -> test_return().
|
||||
modernizer_test(C) ->
|
||||
_ = setup_test_ets(),
|
||||
ID = unique(),
|
||||
true = set_event_version(undefined),
|
||||
?assertEqual(ok, start(ID, init_something, C)),
|
||||
[{EventID, Timestamp, Event}] = get_history(ID, C),
|
||||
true = set_event_version(1),
|
||||
?assertEqual(ok, modernize(ID, C)),
|
||||
?assertEqual([{EventID, Timestamp, {v1, Event}}], get_history(ID, C)),
|
||||
_ = delete_test_ets().
|
||||
|
||||
-spec skip_upgrading_test(config()) -> test_return().
|
||||
skip_upgrading_test(C) ->
|
||||
_ = setup_test_ets(),
|
||||
ID = unique(),
|
||||
true = set_event_version(1),
|
||||
?assertEqual(ok, start(ID, init_something, C)),
|
||||
[{EventID, Timestamp, {v1, Event}}] = get_history(ID, C),
|
||||
true = set_event_version(2),
|
||||
?assertEqual(ok, modernize(ID, C)),
|
||||
?assertEqual([{EventID, Timestamp, {v1, Event}}], get_history(ID, C)),
|
||||
_ = delete_test_ets().
|
||||
|
||||
%% Machinery handler
|
||||
|
||||
-type event() :: any().
|
||||
-type aux_st() :: any().
|
||||
-type machine() :: machinery:machine(event(), aux_st()).
|
||||
-type handler_opts() :: machinery:handler_opts(_).
|
||||
-type result() :: machinery:result(event(), aux_st()).
|
||||
|
||||
-spec init(_Args, machine(), undefined, handler_opts()) ->
|
||||
result().
|
||||
init(init_something, _Machine, _, _Opts) ->
|
||||
#{
|
||||
events => [init_event],
|
||||
aux_state => #{}
|
||||
}.
|
||||
|
||||
-spec process_timeout(machine(), undefined, handler_opts()) ->
|
||||
result().
|
||||
process_timeout(_Args, _, _Opts) ->
|
||||
erlang:error({not_implemented, process_timeout}).
|
||||
|
||||
-spec process_call(_Args, machine(), undefined, handler_opts()) ->
|
||||
no_return().
|
||||
process_call(_Args, _Machine, _, _Opts) ->
|
||||
erlang:error({not_implemented, process_call}).
|
||||
|
||||
-spec process_repair(_Args, machine(), undefined, handler_opts()) ->
|
||||
no_return().
|
||||
process_repair(_Args, _Machine, _, _Opts) ->
|
||||
erlang:error({not_implemented, process_repair}).
|
||||
|
||||
%% machinery_mg_schema callbacks
|
||||
|
||||
-spec marshal(machinery_mg_schema:t(), any(), machinery_mg_schema:context()) ->
|
||||
{machinery_msgpack:t(), machinery_mg_schema:context()}.
|
||||
marshal({event, 2}, V, C) ->
|
||||
{{bin, erlang:term_to_binary({v2, V})}, C};
|
||||
marshal({event, 1}, V, C) ->
|
||||
{{bin, erlang:term_to_binary({v1, V})}, C};
|
||||
marshal(T, V, C) ->
|
||||
machinery_mg_schema_generic:marshal(T, V, C).
|
||||
|
||||
-spec unmarshal(machinery_mg_schema:t(), machinery_msgpack:t(), machinery_mg_schema:context()) ->
|
||||
{any(), machinery_mg_schema:context()}.
|
||||
unmarshal({event, 2}, V, C) ->
|
||||
{bin, EncodedV} = V,
|
||||
{erlang:binary_to_term(EncodedV), C};
|
||||
unmarshal({event, 1}, V, C) ->
|
||||
{bin, EncodedV} = V,
|
||||
{erlang:binary_to_term(EncodedV), C};
|
||||
unmarshal(T, V, C) ->
|
||||
machinery_mg_schema_generic:unmarshal(T, V, C).
|
||||
|
||||
-spec get_version(machinery_mg_schema:vt()) ->
|
||||
machinery_mg_schema:version().
|
||||
get_version(aux_state) ->
|
||||
undefined;
|
||||
get_version(event) ->
|
||||
get_event_version().
|
||||
|
||||
%% Helpers
|
||||
|
||||
start(ID, Args, C) ->
|
||||
machinery:start(namespace(), ID, Args, get_backend(C)).
|
||||
|
||||
modernize(ID, C) ->
|
||||
machinery_modernizer:modernize(namespace(), ID, get_modernizer_backend(C)).
|
||||
|
||||
get(ID, C) ->
|
||||
machinery:get(namespace(), ID, get_backend(C)).
|
||||
|
||||
get_history(ID, C) ->
|
||||
{ok, #{history := History}} = get(ID, C),
|
||||
History.
|
||||
|
||||
namespace() ->
|
||||
general.
|
||||
|
||||
unique() ->
|
||||
genlib:unique().
|
||||
|
||||
start_backend(C) ->
|
||||
{ok, _PID} = supervisor:start_child(
|
||||
?config(group_sup, C),
|
||||
child_spec(C)
|
||||
).
|
||||
|
||||
-define(ETS, test_ets).
|
||||
|
||||
setup_test_ets() ->
|
||||
_ = ets:new(?ETS, [set, named_table]).
|
||||
|
||||
delete_test_ets() ->
|
||||
_ = ets:delete(?ETS).
|
||||
|
||||
set_event_version(Version) ->
|
||||
ets:insert(?ETS, {event_version, Version}).
|
||||
|
||||
get_event_version() ->
|
||||
[{event_version, Version}] = ets:lookup(?ETS, event_version),
|
||||
Version.
|
||||
|
||||
-spec child_spec(config()) ->
|
||||
supervisor:child_spec().
|
||||
child_spec(C) ->
|
||||
child_spec(?config(backend, C), C).
|
||||
|
||||
-spec child_spec(atom(), config()) ->
|
||||
supervisor:child_spec().
|
||||
child_spec(machinery_mg_backend, _C) ->
|
||||
Routes = backend_mg_routes() ++ modernizer_mg_routes(),
|
||||
ServerConfig = #{
|
||||
ip => {0, 0, 0, 0},
|
||||
port => 8022
|
||||
},
|
||||
machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig).
|
||||
|
||||
backend_mg_routes() ->
|
||||
BackendConfig = #{
|
||||
path => <<"/v1/stateproc">>,
|
||||
backend_config => #{
|
||||
schema => ?MODULE
|
||||
}
|
||||
},
|
||||
Handler = {?MODULE, BackendConfig},
|
||||
machinery_mg_backend:get_routes(
|
||||
[Handler],
|
||||
#{event_handler => woody_event_handler_default}
|
||||
).
|
||||
|
||||
modernizer_mg_routes() ->
|
||||
ModernizerConfig = #{
|
||||
path => <<"/v1/modernizer">>,
|
||||
backend_config => #{
|
||||
schema => ?MODULE
|
||||
}
|
||||
},
|
||||
machinery_modernizer_mg_backend:get_routes(
|
||||
[ModernizerConfig],
|
||||
#{event_handler => woody_event_handler_default}
|
||||
).
|
||||
|
||||
-spec get_backend(config()) ->
|
||||
machinery_mg_backend:backend().
|
||||
get_backend(C) ->
|
||||
get_backend(?config(backend, C), C).
|
||||
|
||||
-spec get_modernizer_backend(config()) ->
|
||||
machinery_mg_backend:backend().
|
||||
get_modernizer_backend(C) ->
|
||||
get_backend(?config(modernizer_backend, C), C).
|
||||
|
||||
-spec get_backend(atom(), config()) ->
|
||||
machinery_mg_backend:backend() |
|
||||
machinery_modernizer_mg_backend:backend().
|
||||
get_backend(machinery_mg_backend, C) ->
|
||||
machinery_mg_backend:new(
|
||||
ct_helper:get_woody_ctx(C),
|
||||
#{
|
||||
client => #{
|
||||
url => <<"http://machinegun:8022/v1/automaton">>,
|
||||
event_handler => woody_event_handler_default
|
||||
},
|
||||
schema => ?MODULE
|
||||
}
|
||||
);
|
||||
get_backend(machinery_modernizer_mg_backend, C) ->
|
||||
machinery_modernizer_mg_backend:new(
|
||||
ct_helper:get_woody_ctx(C),
|
||||
#{
|
||||
client => #{
|
||||
url => <<"http://machinegun:8022/v1/automaton">>,
|
||||
event_handler => woody_event_handler_default
|
||||
},
|
||||
schema => ?MODULE
|
||||
}
|
||||
).
|
Loading…
Reference in New Issue
Block a user