From 9d7e4b28cda2a83d1b93eab1f392e40ec0a53018 Mon Sep 17 00:00:00 2001 From: Alexey Date: Thu, 24 Sep 2020 11:10:39 +0300 Subject: [PATCH] MG-183: Modernizer support (#28) --- docker-compose.sh | 2 +- src/machinery_mg_backend.erl | 164 +---------- src/machinery_mg_client.erl | 11 + src/machinery_mg_codec.erl | 196 +++++++++++++ src/machinery_modernizer.erl | 28 ++ src/machinery_modernizer_backend.erl | 28 ++ src/machinery_modernizer_mg_backend.erl | 194 +++++++++++++ test/machinegun/config.yaml | 4 + test/machinery_mg_modernizer_flow_SUITE.erl | 294 ++++++++++++++++++++ 9 files changed, 759 insertions(+), 162 deletions(-) create mode 100644 src/machinery_mg_codec.erl create mode 100644 src/machinery_modernizer.erl create mode 100644 src/machinery_modernizer_backend.erl create mode 100644 src/machinery_modernizer_mg_backend.erl create mode 100644 test/machinery_mg_modernizer_flow_SUITE.erl diff --git a/docker-compose.sh b/docker-compose.sh index acb5b03..24f2b65 100755 --- a/docker-compose.sh +++ b/docker-compose.sh @@ -15,7 +15,7 @@ services: condition: service_healthy machinegun: - image: dr2.rbkmoney.com/rbkmoney/machinegun:9b160a5f39fa54b1a20ca9cc8a9a881cbcc9ed4f + image: dr2.rbkmoney.com/rbkmoney/machinegun:c89c2cecc6fa72895492c718f042a7ec90fd2442 command: /opt/machinegun/bin/machinegun foreground volumes: - ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml diff --git a/src/machinery_mg_backend.erl b/src/machinery_mg_backend.erl index fe3b76b..108676e 100644 --- a/src/machinery_mg_backend.erl +++ b/src/machinery_mg_backend.erl @@ -282,23 +282,9 @@ build_schema_context(NS, Ref) -> %% Marshalling -marshal(descriptor, {NS, Ref, Range}) -> - #mg_stateproc_MachineDescriptor{ - 'ns' = marshal(namespace, NS), - 'ref' = marshal(ref, Ref), - 'range' = marshal(range, Range) - }; - -marshal(range, {Cursor, Limit, Direction}) -> - #mg_stateproc_HistoryRange{ - 'after' = marshal({maybe, event_id}, Cursor), - 'limit' = marshal(limit, Limit), - 'direction' = marshal(direction, Direction) - }; - marshal({signal_result, Schema, Context}, #{} = V) -> #mg_stateproc_SignalResult{ - change = marshal({state_change, Schema, Context}, V), + change = marshal({state_change, Schema, Context}, V), action = marshal(action, maps:get(action, V, [])) }; @@ -365,62 +351,11 @@ marshal(timer, {timeout, V}) -> marshal(timer, {deadline, V}) -> {deadline, marshal(timestamp, V)}; -marshal(namespace, V) -> - marshal(atom, V); - -marshal(ref, V) when is_binary(V) -> - {id, marshal(id, V)}; - -marshal(ref, {tag, V}) -> - {tag, marshal(tag, V)}; - -marshal(id, V) -> - marshal(string, V); - -marshal(tag, V) -> - marshal(string, V); - -marshal(event_id, V) -> - marshal(integer, V); - -marshal(limit, V) -> - marshal({maybe, integer}, V); - -marshal(direction, V) -> - marshal({enum, [forward, backward]}, V); - -marshal({schema, Schema, T, Context}, V) -> - machinery_mg_schema:marshal(Schema, T, V, Context); - -marshal(timestamp, {DateTime, USec}) -> - Ts = genlib_time:daytime_to_unixtime(DateTime) * ?MICROS_PER_SEC + USec, - Str = calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}]), - erlang:list_to_binary(Str); - marshal({list, T}, V) when is_list(V) -> [marshal(T, E) || E <- V]; -marshal({maybe, _}, undefined) -> - undefined; - -marshal({maybe, T}, V) -> - marshal(T, V); - -marshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) -> - _ = lists:member(V, Choices) orelse erlang:error(badarg, [T, V]), - V; - -marshal(atom, V) when is_atom(V) -> - atom_to_binary(V, utf8); - -marshal(string, V) when is_binary(V) -> - V; - -marshal(integer, V) when is_integer(V) -> - V; - marshal(T, V) -> - erlang:error(badarg, [T, V]). + machinery_mg_codec:marshal(T, V). apply_action({set_timer, V}, CA) -> CA#mg_stateproc_ComplexAction{ @@ -466,29 +401,6 @@ apply_action({tag, Tag}, CA) -> } }. -%% -%% No unmarshalling for the decriptor required by the protocol so far. -%% -%% unmarshal( -%% descriptor, -%% #mg_stateproc_MachineDescriptor{ -%% ns = NS, -%% ref = {'id', ID}, -%% range = Range -%% } -%% ) -> -%% {unmarshal(namespace, NS), unmarshal(id, ID), unmarshal(range, Range)}; - -unmarshal( - range, - #mg_stateproc_HistoryRange{ - 'after' = Cursor, - 'limit' = Limit, - 'direction' = Direction - } -) -> - {unmarshal({maybe, event_id}, Cursor), unmarshal(limit, Limit), unmarshal(direction, Direction)}; - unmarshal( {machine, Schema}, #mg_stateproc_Machine{ @@ -545,78 +457,8 @@ unmarshal({signal, Schema, Context0}, {repair, #mg_stateproc_RepairSignal{arg = {Args1, Context1} = unmarshal({schema, Schema, {args, repair}, Context0}, Args0), {{repair, Args1}, Context1}; -unmarshal(namespace, V) -> - unmarshal(atom, V); - -unmarshal(id, V) -> - unmarshal(string, V); - -unmarshal(event_id, V) -> - unmarshal(integer, V); - -unmarshal(limit, V) -> - unmarshal({maybe, integer}, V); - -unmarshal(direction, V) -> - unmarshal({enum, [forward, backward]}, V); - -unmarshal({schema, Schema, T, Context}, V) -> - machinery_mg_schema:unmarshal(Schema, T, V, Context); - -unmarshal(timestamp, V) when is_binary(V) -> - ok = assert_is_utc(V), - Str = erlang:binary_to_list(V), - try - Micros = calendar:rfc3339_to_system_time(Str, [{unit, microsecond}]), - Datetime = calendar:system_time_to_universal_time(Micros, microsecond), - {Datetime, Micros rem ?MICROS_PER_SEC} - catch - error:Reason -> - erlang:error(badarg, [timestamp, V, Reason]) - end; - unmarshal({list, T}, V) when is_list(V) -> [unmarshal(T, E) || E <- V]; -unmarshal({maybe, _}, undefined) -> - undefined; - -unmarshal({maybe, T}, V) -> - unmarshal(T, V); - -unmarshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) -> - case lists:member(V, Choices) of - true -> - V; - false -> - erlang:error(badarg, [T, V]) - end; - -unmarshal(atom, V) when is_binary(V) -> - binary_to_existing_atom(V, utf8); - -unmarshal(string, V) when is_binary(V) -> - V; - -unmarshal(integer, V) when is_integer(V) -> - V; - unmarshal(T, V) -> - erlang:error(badarg, [T, V]). - --spec assert_is_utc(binary()) -> - ok | no_return(). -assert_is_utc(Rfc3339) -> - Size0 = erlang:byte_size(Rfc3339), - Size1 = Size0 - 1, - Size6 = Size0 - 6, - case Rfc3339 of - <<_:Size1/bytes, "Z">> -> - ok; - <<_:Size6/bytes, "+00:00">> -> - ok; - <<_:Size6/bytes, "-00:00">> -> - ok; - _ -> - erlang:error(badarg, [timestamp, Rfc3339, badoffset]) - end. + machinery_mg_codec:unmarshal(T, V). diff --git a/src/machinery_mg_client.erl b/src/machinery_mg_client.erl index 2373aef..0db1be7 100644 --- a/src/machinery_mg_client.erl +++ b/src/machinery_mg_client.erl @@ -12,6 +12,7 @@ -export([call/3]). -export([repair/3]). -export([get_machine/2]). +-export([modernize/2]). -type woody_client() :: #{ url := woody:url(), @@ -68,6 +69,10 @@ new(WoodyClient = #{url := _, event_handler := _}, WoodyCtx) -> namespace_not_found() | machine_not_found(). +-type modernize_errors() :: + namespace_not_found() | + machine_not_found(). + -spec start(namespace(), id(), args(), client()) -> {ok, ok} | {exception, start_errors()}. @@ -92,6 +97,12 @@ repair(Descriptor, Args, Client) -> get_machine(Descriptor, Client) -> issue_call('GetMachine', [Descriptor], Client). +-spec modernize(descriptor(), client()) -> + {ok, ok} | + {exception, modernize_errors()}. +modernize(Descriptor, Client) -> + issue_call('Modernize', [Descriptor], Client). + %% Internal functions issue_call(Function, Args, {WoodyClient, WoodyCtx}) -> diff --git a/src/machinery_mg_codec.erl b/src/machinery_mg_codec.erl new file mode 100644 index 0000000..97a07a7 --- /dev/null +++ b/src/machinery_mg_codec.erl @@ -0,0 +1,196 @@ +-module(machinery_mg_codec). +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +-export([marshal/2]). +-export([unmarshal/2]). + +-define(MICROS_PER_SEC, (1000 * 1000)). + +%% + +-type type() :: atom() | {enum, list(atom())} | {schema, module(), any(), any()}. + +-type encoded_value() :: encoded_value(any()). +-type encoded_value(T) :: T. + +-type decoded_value() :: decoded_value(any()). +-type decoded_value(T) :: T. + +-spec marshal(type(), decoded_value()) -> + encoded_value(). + +marshal(id, V) -> + marshal(string, V); + +marshal(event_id, V) -> + marshal(integer, V); + +marshal(namespace, V) -> + marshal(atom, V); + +marshal(tag, V) -> + marshal(string, V); + +marshal(ref, V) when is_binary(V) -> + {id, marshal(id, V)}; + +marshal(ref, {tag, V}) -> + {tag, marshal(tag, V)}; + +marshal(descriptor, {NS, Ref, Range}) -> + #mg_stateproc_MachineDescriptor{ + 'ns' = marshal(namespace, NS), + 'ref' = marshal(ref, Ref), + 'range' = marshal(range, Range) + }; + +marshal(range, {Cursor, Limit, Direction}) -> + #mg_stateproc_HistoryRange{ + 'after' = maybe_marshal(event_id, Cursor), + 'limit' = maybe_marshal(limit, Limit), + 'direction' = marshal(direction, Direction) + }; + +marshal(limit, V) -> + marshal(integer, V); + +marshal(direction, V) -> + marshal({enum, [forward, backward]}, V); + +marshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) -> + _ = lists:member(V, Choices) orelse erlang:error(badarg, [T, V]), + V; + +marshal({schema, Schema, T, Context}, V) -> + machinery_mg_schema:marshal(Schema, T, V, Context); + +marshal(timestamp, {DateTime, USec}) -> + Ts = genlib_time:daytime_to_unixtime(DateTime) * ?MICROS_PER_SEC + USec, + Str = calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}]), + erlang:list_to_binary(Str); + +marshal(atom, V) when is_atom(V) -> + atom_to_binary(V, utf8); + +marshal(string, V) when is_binary(V) -> + V; + +marshal(integer, V) when is_integer(V) -> + V; + +marshal(T, V) -> + erlang:error(badarg, [T, V]). + +%% + +-spec unmarshal(type(), encoded_value()) -> + decoded_value(). + +unmarshal(id, V) -> + unmarshal(string, V); + +unmarshal(event_id, V) -> + unmarshal(integer, V); + +unmarshal(namespace, V) -> + unmarshal(atom, V); + +unmarshal(tag, V) -> + unmarshal(string, V); + +%% +%% No unmarshalling for the decriptor required by the protocol so far. +%% +%% unmarshal( +%% descriptor, +%% #mg_stateproc_MachineDescriptor{ +%% ns = NS, +%% ref = {'id', ID}, +%% range = Range +%% } +%% ) -> +%% {unmarshal(namespace, NS), unmarshal(id, ID), unmarshal(range, Range)}; + +unmarshal( + range, + #mg_stateproc_HistoryRange{ + 'after' = Cursor, + 'limit' = Limit, + 'direction' = Direction + } +) -> + { + maybe_unmarshal(event_id, Cursor), + maybe_unmarshal(limit, Limit), + unmarshal(direction, Direction) + }; + +unmarshal(limit, V) -> + unmarshal(integer, V); + +unmarshal(direction, V) -> + unmarshal({enum, [forward, backward]}, V); + +unmarshal({enum, Choices = [_ | _]} = T, V) when is_atom(V) -> + case lists:member(V, Choices) of + true -> + V; + false -> + erlang:error(badarg, [T, V]) + end; + +unmarshal({schema, Schema, T, Context}, V) -> + machinery_mg_schema:unmarshal(Schema, T, V, Context); + +unmarshal(timestamp, V) when is_binary(V) -> + ok = assert_is_utc(V), + Str = erlang:binary_to_list(V), + try + Micros = calendar:rfc3339_to_system_time(Str, [{unit, microsecond}]), + Datetime = calendar:system_time_to_universal_time(Micros, microsecond), + {Datetime, Micros rem ?MICROS_PER_SEC} + catch + error:Reason -> + erlang:error(badarg, [timestamp, V, Reason]) + end; + +unmarshal(atom, V) when is_binary(V) -> + binary_to_existing_atom(V, utf8); + +unmarshal(string, V) when is_binary(V) -> + V; + +unmarshal(integer, V) when is_integer(V) -> + V; + +unmarshal(T, V) -> + erlang:error(badarg, [T, V]). + +%% + +maybe_unmarshal(_Type, undefined) -> + undefined; +maybe_unmarshal(Type, Value) -> + unmarshal(Type, Value). + +maybe_marshal(_Type, undefined) -> + undefined; +maybe_marshal(Type, Value) -> + marshal(Type, Value). + +-spec assert_is_utc(binary()) -> + ok | no_return(). +assert_is_utc(Rfc3339) -> + Size0 = erlang:byte_size(Rfc3339), + Size1 = Size0 - 1, + Size6 = Size0 - 6, + case Rfc3339 of + <<_:Size1/bytes, "Z">> -> + ok; + <<_:Size6/bytes, "+00:00">> -> + ok; + <<_:Size6/bytes, "-00:00">> -> + ok; + _ -> + erlang:error(badarg, [timestamp, Rfc3339, badoffset]) + end. diff --git a/src/machinery_modernizer.erl b/src/machinery_modernizer.erl new file mode 100644 index 0000000..cbb8ac0 --- /dev/null +++ b/src/machinery_modernizer.erl @@ -0,0 +1,28 @@ +%%% +%%% Modernizer API abstraction. +%%% Behaviour and API. +%%% + +-module(machinery_modernizer). + +% API +-type namespace() :: machinery:namespace(). +-type ref() :: machinery:ref(). +-type range() :: machinery:range(). +-type backend() :: machinery:backend(_). + +-export([modernize/3]). +-export([modernize/4]). + +%% API + +-spec modernize(namespace(), ref(), backend()) -> + ok| {error, notfound}. +modernize(NS, Ref, Backend) -> + modernize(NS, Ref, {undefined, undefined, forward}, Backend). + +-spec modernize(namespace(), ref(), range(), backend()) -> + ok | {error, notfound}. +modernize(NS, Ref, Range, Backend) -> + {Module, Opts} = machinery_utils:get_backend(Backend), + machinery_modernizer_backend:modernize(Module, NS, Ref, Range, Opts). diff --git a/src/machinery_modernizer_backend.erl b/src/machinery_modernizer_backend.erl new file mode 100644 index 0000000..4edcec5 --- /dev/null +++ b/src/machinery_modernizer_backend.erl @@ -0,0 +1,28 @@ +%%% +%%% Modernizer backend behaviour. +%%% + +-module(machinery_modernizer_backend). + +%% API +-export([modernize/5]). + +%% Behaviour definition + +-type namespace() :: machinery:namespace(). +-type id() :: machinery:id(). +-type ref() :: machinery:ref(). +-type range() :: machinery:range(). +-type backend_opts() :: machinery:backend_opts(_). + +-callback modernize(namespace(), id(), range(), backend_opts()) -> + ok | {error, notfound}. + +%% API + +-type backend() :: module(). + +-spec modernize(backend(), namespace(), ref(), range(), backend_opts()) -> + ok | {error, notfound}. +modernize(Backend, Namespace, Ref, Range, Opts) -> + Backend:modernize(Namespace, Ref, Range, Opts). diff --git a/src/machinery_modernizer_mg_backend.erl b/src/machinery_modernizer_mg_backend.erl new file mode 100644 index 0000000..c1443f6 --- /dev/null +++ b/src/machinery_modernizer_mg_backend.erl @@ -0,0 +1,194 @@ +%%% +%%% Modernizer machinegun backend +%%% + +-module(machinery_modernizer_mg_backend). +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +-type namespace() :: machinery:namespace(). +-type ref() :: machinery:ref(). +-type range() :: machinery:range(). +-type logic_handler(T) :: machinery:logic_handler(T). + +-define(BACKEND_CORE_OPTS, + schema := machinery_mg_schema:schema() +). + +%% Server types +-type backend_config() :: #{ + ?BACKEND_CORE_OPTS +}. + +-type handler_config() :: #{ + path := woody:path(), + backend_config := backend_config() +}. + +-type handler( ) :: handler_config(). %% handler server spec + +-type handler_opts() :: machinery:handler_opts(#{ + woody_ctx := woody_context:ctx() +}). + +-type backend_handler_opts() :: #{ + ?BACKEND_CORE_OPTS +}. + +%% Client types +-type backend_opts() :: machinery:backend_opts(#{ + woody_ctx := woody_context:ctx(), + client := machinery_mg_client:woody_client(), + ?BACKEND_CORE_OPTS +}). + +-type backend() :: {?MODULE, backend_opts()}. + +-type backend_opts_static() :: #{ + client := machinery_mg_client:woody_client(), + ?BACKEND_CORE_OPTS +}. + +-export_type([backend_config/0]). +-export_type([handler_config/0]). +-export_type([logic_handler/1]). +-export_type([handler/0]). +-export_type([handler_opts/0]). +-export_type([backend_opts/0]). +-export_type([backend/0]). + +%% API +-export([new/2]). +-export([get_routes/2]). +-export([get_handler/2]). + +%% Machinery backend +-behaviour(machinery_modernizer_backend). + +-export([modernize/4]). + +%% Woody handler +-behaviour(woody_server_thrift_handler). + +-export([handle_function/4]). + +%% API + +-spec get_routes([handler()], machinery_utils:route_opts()) -> + machinery_utils:woody_routes(). +get_routes(Handlers, Opts) -> + machinery_utils:get_woody_routes(Handlers, fun get_handler/2, Opts). + +-spec get_handler(handler(), machinery_utils:route_opts()) -> + machinery_utils:woody_handler(). +get_handler(#{path := Path, backend_config := Config}, _) -> + {Path, { + {mg_proto_state_processing_thrift, 'Modernizer'}, + {?MODULE, Config} + }}. + +-spec new(woody_context:ctx(), backend_opts_static()) -> + backend(). +new(WoodyCtx, Opts = #{client := _, schema := _}) -> + {?MODULE, Opts#{woody_ctx => WoodyCtx}}. + +%% Machinery backend + +-spec modernize(namespace(), ref(), range(), backend_opts()) -> + ok | {error, notfound}. +modernize(NS, Ref, Range, Opts) -> + Client = get_client(Opts), + Descriptor = {NS, Ref, Range}, + case machinery_mg_client:modernize(marshal(descriptor, Descriptor), Client) of + {ok, ok} -> + ok; + {exception, #mg_stateproc_MachineNotFound{}} -> + {error, notfound}; + {exception, #mg_stateproc_NamespaceNotFound{}} -> + error({namespace_not_found, NS}) + end. + +%% Woody handler + +-spec handle_function('ModernizeEvent', woody:args(), woody_context:ctx(), backend_handler_opts()) -> + {ok, mg_proto_state_processing_thrift:'ModernizeEventResult'()}. +handle_function('ModernizeEvent', [MachineEvent0], _WoodyCtx, #{schema := Schema}) -> + {MachineEvent, Context} = unmarshal_machine_event(Schema, MachineEvent0), + TargetVersion = machinery_mg_schema:get_version(Schema, event), + EventPayload = marshal_event_content(Schema, TargetVersion, Context, MachineEvent), + {ok, marshal(modernize_event_result, EventPayload)}. + +%% Utils + + +unmarshal_machine_event(Schema, #mg_stateproc_MachineEvent{ + ns = MachineNS, + id = MachineID, + event = Event +}) -> + ID = unmarshal(id, MachineID), + NS = unmarshal(namespace, MachineNS), + Context = build_schema_context(NS, ID), + {unmarshal({event, Schema, Context}, Event), Context}. + +marshal_event_content(Schema, Version, Context0, _Event = #{data := EventData0}) -> + {EventData1, Context0} = marshal({schema, Schema, {event, Version}, Context0}, EventData0), + #mg_stateproc_Content{ + format_version = maybe_marshal(format_version, Version), + data = EventData1 + }. + +get_client(#{client := Client, woody_ctx := WoodyCtx}) -> + machinery_mg_client:new(Client, WoodyCtx). + +build_schema_context(NS, Ref) -> + #{ + machine_ns => NS, + machine_ref => Ref + }. + +%% Marshalling + +marshal(modernize_event_result, Content) -> + #mg_stateproc_ModernizeEventResult{ + event_payload = Content + }; + +marshal(format_version, V) -> + marshal(integer, V); + +marshal(T, V) -> + machinery_mg_codec:marshal(T, V). + +%% Unmarshalling + +unmarshal({event, Schema, Context0}, #mg_stateproc_Event{ + id = EventID, + created_at = CreatedAt0, + format_version = Version, + data = EventData0 +}) -> + CreatedAt1 = unmarshal(timestamp, CreatedAt0), + Context1 = Context0#{created_at => CreatedAt1}, + {EventData1, Context1} = unmarshal({schema, Schema, {event, Version}, Context1}, EventData0), + #{ + id => unmarshal(event_id, EventID), + created_at => CreatedAt1, + format_version => maybe_unmarshal(format_version, Version), + data => EventData1 + }; + +unmarshal(format_version, V) -> + unmarshal(integer, V); + +unmarshal(T, V) -> + machinery_mg_codec:unmarshal(T, V). + +maybe_unmarshal(_Type, undefined) -> + undefined; +maybe_unmarshal(Type, Value) -> + unmarshal(Type, Value). + +maybe_marshal(_Type, undefined) -> + undefined; +maybe_marshal(Type, Value) -> + marshal(Type, Value). diff --git a/test/machinegun/config.yaml b/test/machinegun/config.yaml index 3df1b47..1e08d08 100644 --- a/test/machinegun/config.yaml +++ b/test/machinegun/config.yaml @@ -10,6 +10,10 @@ namespaces: general: processor: url: http://machinery:8022/v1/stateproc + modernizer: + current_format_version: 1 + handler: + url: http://machinery:8022/v1/modernizer storage: type: memory diff --git a/test/machinery_mg_modernizer_flow_SUITE.erl b/test/machinery_mg_modernizer_flow_SUITE.erl new file mode 100644 index 0000000..8b2f54e --- /dev/null +++ b/test/machinery_mg_modernizer_flow_SUITE.erl @@ -0,0 +1,294 @@ +-module(machinery_mg_modernizer_flow_SUITE). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%% Common Tests callbacks +-export([all/0]). +-export([groups/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). +-export([init_per_group/2]). +-export([end_per_group/2]). +-export([init_per_testcase/2]). + +%% Tests + +-export([modernizer_test/1]). +-export([skip_upgrading_test/1]). + +%% Machinery callbacks + +-behaviour(machinery). + +-export([init/4]). +-export([process_timeout/3]). +-export([process_repair/4]). +-export([process_call/4]). + +-behaviour(machinery_mg_schema). + +-export([marshal/3]). +-export([unmarshal/3]). +-export([get_version/1]). + +%% Internal types + +-type config() :: ct_helper:config(). +-type test_case_name() :: ct_helper:test_case_name(). +-type group_name() :: ct_helper:group_name(). +-type test_return() :: _ | no_return(). + +-spec all() -> [test_case_name() | {group, group_name()}]. +all() -> + [ + {group, machinery_mg_backend} + ]. + +-spec groups() -> + [{group_name(), list(), test_case_name()}]. +groups() -> + [ + {machinery_mg_backend, [], [{group, all}]}, + {all, [], [ + modernizer_test, + skip_upgrading_test + ]} + ]. + +-spec init_per_suite(config()) -> config(). +init_per_suite(C) -> + {StartedApps, _StartupCtx} = ct_helper:start_apps([machinery]), + [{started_apps, StartedApps}| C]. + +-spec end_per_suite(config()) -> _. +end_per_suite(C) -> + ok = ct_helper:stop_apps(?config(started_apps, C)), + ok. + +-spec init_per_group(group_name(), config()) -> config(). +init_per_group(machinery_mg_backend = Name, C0) -> + C1 = [ + {backend, Name}, + {modernizer_backend, machinery_modernizer_mg_backend}, + {group_sup, ct_sup:start()} | C0 + ], + {ok, _Pid} = start_backend(C1), + C1; +init_per_group(_Name, C) -> + C. + +-spec end_per_group(group_name(), config()) -> config(). +end_per_group(machinery_mg_backend, C) -> + ok = ct_sup:stop(?config(group_sup, C)), + C; +end_per_group(_Name, C) -> + C. + +-spec init_per_testcase(test_case_name(), config()) -> config(). +init_per_testcase(TestCaseName, C) -> + ct_helper:makeup_cfg([ct_helper:test_case_name(TestCaseName), ct_helper:woody_ctx()], C). + +%% Tests + +-spec modernizer_test(config()) -> test_return(). +modernizer_test(C) -> + _ = setup_test_ets(), + ID = unique(), + true = set_event_version(undefined), + ?assertEqual(ok, start(ID, init_something, C)), + [{EventID, Timestamp, Event}] = get_history(ID, C), + true = set_event_version(1), + ?assertEqual(ok, modernize(ID, C)), + ?assertEqual([{EventID, Timestamp, {v1, Event}}], get_history(ID, C)), + _ = delete_test_ets(). + +-spec skip_upgrading_test(config()) -> test_return(). +skip_upgrading_test(C) -> + _ = setup_test_ets(), + ID = unique(), + true = set_event_version(1), + ?assertEqual(ok, start(ID, init_something, C)), + [{EventID, Timestamp, {v1, Event}}] = get_history(ID, C), + true = set_event_version(2), + ?assertEqual(ok, modernize(ID, C)), + ?assertEqual([{EventID, Timestamp, {v1, Event}}], get_history(ID, C)), + _ = delete_test_ets(). + +%% Machinery handler + +-type event() :: any(). +-type aux_st() :: any(). +-type machine() :: machinery:machine(event(), aux_st()). +-type handler_opts() :: machinery:handler_opts(_). +-type result() :: machinery:result(event(), aux_st()). + +-spec init(_Args, machine(), undefined, handler_opts()) -> + result(). +init(init_something, _Machine, _, _Opts) -> + #{ + events => [init_event], + aux_state => #{} + }. + +-spec process_timeout(machine(), undefined, handler_opts()) -> + result(). +process_timeout(_Args, _, _Opts) -> + erlang:error({not_implemented, process_timeout}). + +-spec process_call(_Args, machine(), undefined, handler_opts()) -> + no_return(). +process_call(_Args, _Machine, _, _Opts) -> + erlang:error({not_implemented, process_call}). + +-spec process_repair(_Args, machine(), undefined, handler_opts()) -> + no_return(). +process_repair(_Args, _Machine, _, _Opts) -> + erlang:error({not_implemented, process_repair}). + +%% machinery_mg_schema callbacks + +-spec marshal(machinery_mg_schema:t(), any(), machinery_mg_schema:context()) -> + {machinery_msgpack:t(), machinery_mg_schema:context()}. +marshal({event, 2}, V, C) -> + {{bin, erlang:term_to_binary({v2, V})}, C}; +marshal({event, 1}, V, C) -> + {{bin, erlang:term_to_binary({v1, V})}, C}; +marshal(T, V, C) -> + machinery_mg_schema_generic:marshal(T, V, C). + +-spec unmarshal(machinery_mg_schema:t(), machinery_msgpack:t(), machinery_mg_schema:context()) -> + {any(), machinery_mg_schema:context()}. +unmarshal({event, 2}, V, C) -> + {bin, EncodedV} = V, + {erlang:binary_to_term(EncodedV), C}; +unmarshal({event, 1}, V, C) -> + {bin, EncodedV} = V, + {erlang:binary_to_term(EncodedV), C}; +unmarshal(T, V, C) -> + machinery_mg_schema_generic:unmarshal(T, V, C). + +-spec get_version(machinery_mg_schema:vt()) -> + machinery_mg_schema:version(). +get_version(aux_state) -> + undefined; +get_version(event) -> + get_event_version(). + +%% Helpers + +start(ID, Args, C) -> + machinery:start(namespace(), ID, Args, get_backend(C)). + +modernize(ID, C) -> + machinery_modernizer:modernize(namespace(), ID, get_modernizer_backend(C)). + +get(ID, C) -> + machinery:get(namespace(), ID, get_backend(C)). + +get_history(ID, C) -> + {ok, #{history := History}} = get(ID, C), + History. + +namespace() -> + general. + +unique() -> + genlib:unique(). + +start_backend(C) -> + {ok, _PID} = supervisor:start_child( + ?config(group_sup, C), + child_spec(C) + ). + +-define(ETS, test_ets). + +setup_test_ets() -> + _ = ets:new(?ETS, [set, named_table]). + +delete_test_ets() -> + _ = ets:delete(?ETS). + +set_event_version(Version) -> + ets:insert(?ETS, {event_version, Version}). + +get_event_version() -> + [{event_version, Version}] = ets:lookup(?ETS, event_version), + Version. + +-spec child_spec(config()) -> + supervisor:child_spec(). +child_spec(C) -> + child_spec(?config(backend, C), C). + +-spec child_spec(atom(), config()) -> + supervisor:child_spec(). +child_spec(machinery_mg_backend, _C) -> + Routes = backend_mg_routes() ++ modernizer_mg_routes(), + ServerConfig = #{ + ip => {0, 0, 0, 0}, + port => 8022 + }, + machinery_utils:woody_child_spec(machinery_mg_backend, Routes, ServerConfig). + +backend_mg_routes() -> + BackendConfig = #{ + path => <<"/v1/stateproc">>, + backend_config => #{ + schema => ?MODULE + } + }, + Handler = {?MODULE, BackendConfig}, + machinery_mg_backend:get_routes( + [Handler], + #{event_handler => woody_event_handler_default} + ). + +modernizer_mg_routes() -> + ModernizerConfig = #{ + path => <<"/v1/modernizer">>, + backend_config => #{ + schema => ?MODULE + } + }, + machinery_modernizer_mg_backend:get_routes( + [ModernizerConfig], + #{event_handler => woody_event_handler_default} + ). + +-spec get_backend(config()) -> + machinery_mg_backend:backend(). +get_backend(C) -> + get_backend(?config(backend, C), C). + +-spec get_modernizer_backend(config()) -> + machinery_mg_backend:backend(). +get_modernizer_backend(C) -> + get_backend(?config(modernizer_backend, C), C). + +-spec get_backend(atom(), config()) -> + machinery_mg_backend:backend() | + machinery_modernizer_mg_backend:backend(). +get_backend(machinery_mg_backend, C) -> + machinery_mg_backend:new( + ct_helper:get_woody_ctx(C), + #{ + client => #{ + url => <<"http://machinegun:8022/v1/automaton">>, + event_handler => woody_event_handler_default + }, + schema => ?MODULE + } + ); +get_backend(machinery_modernizer_mg_backend, C) -> + machinery_modernizer_mg_backend:new( + ct_helper:get_woody_ctx(C), + #{ + client => #{ + url => <<"http://machinegun:8022/v1/automaton">>, + event_handler => woody_event_handler_default + }, + schema => ?MODULE + } + ).