Add machinery schema context (#26)

This commit is contained in:
Andrey Fadeev 2020-06-10 18:43:17 +03:00 committed by GitHub
parent 4e8245860a
commit 032ee30bf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 191 additions and 152 deletions

View File

@ -52,6 +52,7 @@
-export_type([error/1]).
-export_type([machine/2]).
-export_type([event/1]).
-export_type([aux_state/1]).
-type modopts(O) :: module() | {module(), O}.

View File

@ -112,7 +112,8 @@ new(WoodyCtx, Opts = #{client := _, schema := _}) ->
start(NS, ID, Args, Opts) ->
Client = get_client(Opts),
Schema = get_schema(Opts),
InitArgs = marshal({schema, Schema, {args, init}}, Args),
SContext0 = build_schema_context(NS, ID),
{InitArgs, _SContext1} = marshal({schema, Schema, {args, init}, SContext0}, Args),
case machinery_mg_client:start(marshal(namespace, NS), marshal(id, ID), InitArgs, Client) of
{ok, ok} ->
ok;
@ -129,11 +130,13 @@ start(NS, ID, Args, Opts) ->
call(NS, Ref, Range, Args, Opts) ->
Client = get_client(Opts),
Schema = get_schema(Opts),
SContext0 = build_schema_context(NS, Ref),
Descriptor = {NS, Ref, Range},
CallArgs = marshal({schema, Schema, {args, call}}, Args),
{CallArgs, SContext1} = marshal({schema, Schema, {args, call}, SContext0}, Args),
case machinery_mg_client:call(marshal(descriptor, Descriptor), CallArgs, Client) of
{ok, Response} ->
{ok, unmarshal({schema, Schema, {response, call}}, Response)};
{ok, Response0} ->
{Response1, _SContext2} = unmarshal({schema, Schema, {response, call}, SContext1}, Response0),
{ok, Response1};
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_NamespaceNotFound{}} ->
@ -147,13 +150,15 @@ call(NS, Ref, Range, Args, Opts) ->
repair(NS, Ref, Range, Args, Opts) ->
Client = get_client(Opts),
Schema = get_schema(Opts),
SContext0 = build_schema_context(NS, Ref),
Descriptor = {NS, Ref, Range},
CallArgs = marshal({schema, Schema, {args, repair}}, Args),
case machinery_mg_client:repair(marshal(descriptor, Descriptor), CallArgs, Client) of
{ok, Response} ->
{ok, unmarshal({schema, Schema, {response, {repair, success}}}, Response)};
{RepairArgs, SContext1} = marshal({schema, Schema, {args, repair}, SContext0}, Args),
case machinery_mg_client:repair(marshal(descriptor, Descriptor), RepairArgs, Client) of
{ok, Response0} ->
{Response1, _SContext2} = unmarshal({schema, Schema, {response, {repair, success}}, SContext1}, Response0),
{ok, Response1};
{exception, #mg_stateproc_RepairFailed{reason = Reason}} ->
{error, {failed, unmarshal({schema, Schema, {response, {repair, failure}}}, Reason)}};
{error, {failed, unmarshal({schema, Schema, {response, {repair, failure}}, SContext1}, Reason)}};
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_MachineAlreadyWorking{}} ->
@ -171,8 +176,9 @@ get(NS, Ref, Range, Opts) ->
Schema = get_schema(Opts),
Descriptor = {NS, Ref, Range},
case machinery_mg_client:get_machine(marshal(descriptor, Descriptor), Client) of
{ok, Machine} ->
{ok, unmarshal({machine, Schema}, Machine)};
{ok, Machine0} ->
{Machine1, _Context} = unmarshal({machine, Schema}, Machine0),
{ok, Machine1};
{exception, #mg_stateproc_MachineNotFound{}} ->
{error, notfound};
{exception, #mg_stateproc_NamespaceNotFound{}} ->
@ -189,42 +195,45 @@ get(NS, Ref, Range, Opts) ->
('ProcessRepair', woody:args(), woody_context:ctx(), backend_handler_opts()) ->
{ok, mg_proto_state_processing_thrift:'RepairResult'()}.
handle_function('ProcessSignal', FunctionArgs, WoodyCtx, Opts) ->
[#mg_stateproc_SignalArgs{signal = Signal, machine = Machine}] = FunctionArgs,
[#mg_stateproc_SignalArgs{signal = MarshaledSignal, machine = MarshaledMachine}] = FunctionArgs,
#{handler := Handler, schema := Schema} = Opts,
Machine1 = unmarshal({machine, Schema}, Machine),
{Machine, SContext0} = unmarshal({machine, Schema}, MarshaledMachine),
{Signal, SContext1} = unmarshal({signal, Schema, SContext0}, MarshaledSignal),
Result = dispatch_signal(
unmarshal({signal, Schema}, Signal),
Machine1,
Signal,
Machine,
machinery_utils:get_handler(Handler),
get_handler_opts(WoodyCtx)
),
{ok, marshal({signal_result, Schema}, handle_result(Result, Machine1))};
{ok, marshal({signal_result, Schema, SContext1}, handle_result(Result, Machine))};
handle_function('ProcessCall', FunctionArgs, WoodyCtx, Opts) ->
[#mg_stateproc_CallArgs{arg = Args, machine = Machine}] = FunctionArgs,
[#mg_stateproc_CallArgs{arg = MarshaledArgs, machine = MarshaledMachine}] = FunctionArgs,
#{handler := Handler, schema := Schema} = Opts,
Machine1 = unmarshal({machine, Schema}, Machine),
{Machine, SContext0} = unmarshal({machine, Schema}, MarshaledMachine),
{Args, SContext1} = unmarshal({schema, Schema, {args, call}, SContext0}, MarshaledArgs),
{Response, Result} = dispatch_call(
unmarshal({schema, Schema, {args, call}}, Args),
Machine1,
Args,
Machine,
machinery_utils:get_handler(Handler),
get_handler_opts(WoodyCtx)
),
{ok, marshal({call_result, Schema}, {Response, handle_result(Result, Machine1)})};
{ok, marshal({call_result, Schema, SContext1}, {Response, handle_result(Result, Machine)})};
handle_function('ProcessRepair', FunctionArgs, WoodyCtx, Opts) ->
[#mg_stateproc_RepairArgs{arg = Args, machine = Machine}] = FunctionArgs,
[#mg_stateproc_RepairArgs{arg = MarshaledArgs, machine = MarshaledMachine}] = FunctionArgs,
#{handler := Handler, schema := Schema} = Opts,
Machine1 = unmarshal({machine, Schema}, Machine),
{Machine, SContext0} = unmarshal({machine, Schema}, MarshaledMachine),
{Args, SContext1} = unmarshal({schema, Schema, {args, repair}, SContext0}, MarshaledArgs),
RepairResult = dispatch_repair(
unmarshal({schema, Schema, {args, repair}}, Args),
Machine1,
Args,
Machine,
machinery_utils:get_handler(Handler),
get_handler_opts(WoodyCtx)
),
case RepairResult of
{ok, {Response, Result}} ->
{ok, marshal({repair_result, Schema}, {Response, handle_result(Result, Machine1)})};
{ok, marshal({repair_result, Schema, SContext1}, {Response, handle_result(Result, Machine)})};
{error, Reason} ->
erlang:throw(marshal({repair_fail, Schema}, Reason))
erlang:throw(marshal({repair_fail, Schema, SContext1}, Reason))
end.
%% Utils
@ -263,26 +272,15 @@ set_aux_state(undefined, ReceivedState) ->
set_aux_state(NewState, _) ->
NewState.
%% Marshalling
-spec build_schema_context(namespace(), ref()) ->
machinery_mg_schema:context().
build_schema_context(NS, Ref) ->
#{
machine_ns => NS,
machine_ref => Ref
}.
%% No marshalling for the machine required by the protocol so far.
%%
%% marshal(
%% {machine, Schema},
%% #{
%% ns := NS,
%% id := ID,
%% history := History
%% }
%% ) ->
%% #mg_stateproc_Machine{
%% 'ns' = marshal(namespace, NS),
%% 'id' = marshal(id, ID),
%% 'history' = marshal({history, Schema}, History),
%% % TODO
%% % There are required fields left
%% 'history_range' = marshal(range, {undefined, undefined, forward})
%% };
%% Marshalling
marshal(descriptor, {NS, Ref, Range}) ->
#mg_stateproc_MachineDescriptor{
@ -298,64 +296,61 @@ marshal(range, {Cursor, Limit, Direction}) ->
'direction' = marshal(direction, Direction)
};
marshal({history, Schema}, V) ->
marshal({list, {event, Schema}}, V);
marshal({event, Schema}, {EventID, CreatedAt, Body}) ->
Version = machinery_mg_schema:get_version(Schema, event),
#mg_stateproc_Event{
'id' = marshal(event_id, EventID),
'created_at' = marshal(timestamp, CreatedAt),
'data' = marshal({schema, Schema, {event, Version}}, Body)
};
marshal({signal, Schema}, {init, Args}) ->
{init, #mg_stateproc_InitSignal{arg = marshal({schema, Schema, {args, init}}, Args)}};
marshal({signal, _Schema}, timeout) ->
{timeout, #mg_stateproc_TimeoutSignal{}};
marshal({signal, Schema}, {repair, Args}) ->
{repair, #mg_stateproc_RepairSignal{arg = marshal({maybe, {schema, Schema, {args, repair}}}, Args)}};
marshal({signal_result, Schema}, #{} = V) ->
marshal({signal_result, Schema, Context}, #{} = V) ->
#mg_stateproc_SignalResult{
change = marshal({state_change, Schema}, V),
change = marshal({state_change, Schema, Context}, V),
action = marshal(action, maps:get(action, V, []))
};
marshal({call_result, Schema}, {Response, #{} = V}) ->
marshal({call_result, Schema, Context}, {Response0, #{} = V}) ->
% It is expected that schema doesn't want to save anything in the context here.
% The main reason for this is the intention to simplify the code.
% So, feel free to change the behavior it is needed for you.
{Response1, Context} = marshal({schema, Schema, {response, call}, Context}, Response0),
#mg_stateproc_CallResult{
response = marshal({schema, Schema, {response, call}}, Response),
change = marshal({state_change, Schema}, V),
response = Response1,
change = marshal({state_change, Schema, Context}, V),
action = marshal(action, maps:get(action, V, []))
};
marshal({repair_result, Schema}, {Response, #{} = V}) ->
marshal({repair_result, Schema, Context}, {Response0, #{} = V}) ->
% It is expected that schema doesn't want to save anything in the context here.
{Response1, Context} = marshal({schema, Schema, {response, {repair, success}}, Context}, Response0),
#mg_stateproc_RepairResult{
response = marshal({schema, Schema, {response, {repair, success}}}, Response),
change = marshal({state_change, Schema}, V),
response = Response1,
change = marshal({state_change, Schema, Context}, V),
action = marshal(action, maps:get(action, V, []))
};
marshal({repair_fail, Schema}, Reason) ->
marshal({repair_fail, Schema, Context}, Reason) ->
% It is expected that schema doesn't want to save anything in the context here.
{Reason1, Context} = marshal({schema, Schema, {response, {repair, failure}}, Context}, Reason),
#mg_stateproc_RepairFailed{
reason = marshal({schema, Schema, {response, {repair, failure}}}, Reason)
reason = Reason1
};
marshal({state_change, Schema}, #{} = V) ->
marshal({state_change, Schema, Context}, #{} = V) ->
AuxStateVersion = machinery_mg_schema:get_version(Schema, aux_state),
EventVersion = machinery_mg_schema:get_version(Schema, event),
#mg_stateproc_MachineStateChange{
events = [
#mg_stateproc_Content{data = Event, format_version = EventVersion}
|| Event <- marshal({list, {schema, Schema, {event, EventVersion}}}, maps:get(events, V, []))
],
% TODO
% Provide this to logic handlers as well
aux_state = #mg_stateproc_Content{
data = marshal({schema, Schema, {aux_state, AuxStateVersion}}, maps:get(aux_state, V, undefined)),
events = marshal({list, {new_event_change, EventVersion, Schema, Context}}, maps:get(events, V, [])),
aux_state = marshal({aux_state_change, AuxStateVersion, Schema, Context}, maps:get(aux_state, V, undefined))
};
marshal({new_event_change, EventVersion, Schema, Context}, V) ->
% It is expected that schema doesn't want to save anything in the context here.
{Event, Context} = marshal({schema, Schema, {event, EventVersion}, Context}, V),
#mg_stateproc_Content{
data = Event,
format_version = EventVersion
};
marshal({aux_state_change, AuxStateVersion, Schema, Context}, V) ->
% It is expected that schema doesn't want to save anything in the context here.
{AuxState, Context} = marshal({schema, Schema, {aux_state, AuxStateVersion}, Context}, V),
#mg_stateproc_Content{
data = AuxState,
format_version = AuxStateVersion
}
};
marshal(action, V) when is_list(V) ->
@ -394,10 +389,8 @@ marshal(limit, V) ->
marshal(direction, V) ->
marshal({enum, [forward, backward]}, V);
marshal({schema, Schema, T}, V) ->
% TODO
% Marshal properly
machinery_mg_schema:marshal(Schema, T, V);
marshal({schema, Schema, T, Context}, V) ->
machinery_mg_schema:marshal(Schema, T, V, Context);
marshal(timestamp, {DateTime, USec}) ->
Ts = genlib_time:daytime_to_unixtime(DateTime) * ?MICROS_PER_SEC + USec,
@ -506,40 +499,51 @@ unmarshal(
'aux_state' = #mg_stateproc_Content{format_version = Version, data = AuxState}
}
) ->
#{
ns => unmarshal(namespace, NS),
id => unmarshal(id, ID),
history => unmarshal({history, Schema}, History),
ID1 = unmarshal(id, ID),
NS1 = unmarshal(namespace, NS),
Context0 = build_schema_context(NS1, ID1),
{AuxState1, Context1} = unmarshal({schema, Schema, {aux_state, Version}, Context0}, AuxState),
Machine = #{
ns => ID1,
id => NS1,
history => unmarshal({history, Schema, Context1}, History),
range => unmarshal(range, Range),
aux_state => unmarshal({maybe, {schema, Schema, {aux_state, Version}}}, AuxState)
};
aux_state => AuxState1
},
{Machine, Context1};
unmarshal({history, Schema}, V) ->
unmarshal({list, {event, Schema}}, V);
unmarshal({history, Schema, Context}, V) ->
unmarshal({list, {event, Schema, Context}}, V);
unmarshal(
{event, Schema},
{event, Schema, Context0},
#mg_stateproc_Event{
'id' = EventID,
'created_at' = CreatedAt,
'created_at' = CreatedAt0,
'format_version' = Version,
'data' = Payload
'data' = Payload0
}
) ->
CreatedAt1 = unmarshal(timestamp, CreatedAt0),
Context1 = Context0#{created_at => CreatedAt1},
% It is expected that schema doesn't want to save anything in the context here.
{Payload1, Context1} = unmarshal({schema, Schema, {event, Version}, Context1}, Payload0),
{
unmarshal(event_id, EventID),
unmarshal(timestamp, CreatedAt),
unmarshal({schema, Schema, {event, Version}}, Payload)
CreatedAt1,
Payload1
};
unmarshal({signal, Schema}, {init, #mg_stateproc_InitSignal{arg = Args}}) ->
{init, unmarshal({schema, Schema, {args, init}}, Args)};
unmarshal({signal, Schema, Context0}, {init, #mg_stateproc_InitSignal{arg = Args0}}) ->
{Args1, Context1} = unmarshal({schema, Schema, {args, init}, Context0}, Args0),
{{init, Args1}, Context1};
unmarshal({signal, _Schema}, {timeout, #mg_stateproc_TimeoutSignal{}}) ->
timeout;
unmarshal({signal, _Schema, Context}, {timeout, #mg_stateproc_TimeoutSignal{}}) ->
{timeout, Context};
unmarshal({signal, Schema}, {repair, #mg_stateproc_RepairSignal{arg = Args}}) ->
{repair, unmarshal({maybe, {schema, Schema, {args, repair}}}, Args)};
unmarshal({signal, Schema, Context0}, {repair, #mg_stateproc_RepairSignal{arg = Args0}}) ->
{Args1, Context1} = unmarshal({schema, Schema, {args, repair}, Context0}, Args0),
{{repair, Args1}, Context1};
unmarshal(namespace, V) ->
unmarshal(atom, V);
@ -556,8 +560,8 @@ unmarshal(limit, V) ->
unmarshal(direction, V) ->
unmarshal({enum, [forward, backward]}, V);
unmarshal({schema, Schema, T}, V) ->
machinery_mg_schema:unmarshal(Schema, T, V);
unmarshal({schema, Schema, T, Context}, V) ->
machinery_mg_schema:unmarshal(Schema, T, V, Context);
unmarshal(timestamp, V) when is_binary(V) ->
ok = assert_is_utc(V),

View File

@ -29,11 +29,18 @@
-type vt() :: aux_state | event.
-type version() :: undefined | integer().
-callback marshal(t(), v(_)) ->
machinery_msgpack:t().
-type context() :: #{
machine_ref := machinery:ref(),
machine_ns := machinery:namespace(),
created_at => machinery:timestamp(),
atom() => term()
}.
-callback unmarshal(t(), machinery_msgpack:t()) ->
v(_).
-callback marshal(t(), v(_), context()) ->
{machinery_msgpack:t(), context()}.
-callback unmarshal(t(), machinery_msgpack:t(), context()) ->
{v(_), context()}.
-callback get_version(vt()) ->
version().
@ -43,22 +50,23 @@
-export_type([v/1]).
-export_type([vt/0]).
-export_type([version/0]).
-export_type([context/0]).
%% API
-export([marshal/3]).
-export([unmarshal/3]).
-export([marshal/4]).
-export([unmarshal/4]).
-export([get_version/2]).
-spec marshal(schema(), t(), v(_)) ->
machinery_msgpack:t().
marshal(Schema, T, V) ->
Schema:marshal(T, V).
-spec marshal(schema(), t(), v(_), context()) ->
{machinery_msgpack:t(), context()}.
marshal(Schema, T, V, C) ->
Schema:marshal(T, V, C).
-spec unmarshal(schema(), t(), machinery_msgpack:t()) ->
v(_).
unmarshal(Schema, T, V) ->
Schema:unmarshal(T, V).
-spec unmarshal(schema(), t(), machinery_msgpack:t(), context()) ->
{v(_), context()}.
unmarshal(Schema, T, V, C) ->
Schema:unmarshal(T, V, C).
-spec get_version(schema(), vt()) ->
version().

View File

@ -12,8 +12,8 @@
%% Storage schema behaviour
-behaviour(machinery_mg_schema).
-export([marshal/2]).
-export([unmarshal/2]).
-export([marshal/3]).
-export([unmarshal/3]).
-export([get_version/1]).
-import(machinery_msgpack, [
@ -26,16 +26,17 @@
-type t() :: machinery_mg_schema:t().
-type v(T) :: machinery_mg_schema:v(T).
-type context() :: machinery_mg_schema:context().
-spec marshal(t(), v(eterm())) ->
machinery_msgpack:t().
marshal(_T, V) ->
marshal(V).
-spec marshal(t(), v(eterm()), context()) ->
{machinery_msgpack:t(), context()}.
marshal(_T, V, C) ->
{marshal(V), C}.
-spec unmarshal(t(), machinery_msgpack:t()) ->
v(eterm()).
unmarshal(_T, V) ->
unmarshal(V).
-spec unmarshal(t(), machinery_msgpack:t(), context()) ->
{v(eterm()), context()}.
unmarshal(_T, V, C) ->
{unmarshal(V), C}.
-spec get_version(machinery_mg_schema:vt()) ->
machinery_mg_schema:version().

View File

@ -1,4 +1,4 @@
-module(machinery_mg_schema_versions_SUITE).
-module(machinery_mg_schema_flow_SUITE).
-behaviour(machinery).
@ -25,8 +25,8 @@
%% machinery_mg_schema callbacks
-export([marshal/2]).
-export([unmarshal/2]).
-export([marshal/3]).
-export([unmarshal/3]).
-export([get_version/1]).
%% Internal types
@ -124,9 +124,9 @@ process_repair(repair_something, #{history := History}, _, _Opts) ->
%% machinery_mg_schema callbacks
-spec marshal(machinery_mg_schema:t(), any()) ->
machinery_msgpack:t().
marshal(T, V) when
-spec marshal(machinery_mg_schema:t(), any(), machinery_mg_schema:context()) ->
{machinery_msgpack:t(), machinery_mg_schema:context()}.
marshal(T, V, C) when
T =:= {aux_state, 1} orelse
T =:= {event, 2} orelse
T =:= {args, init} orelse
@ -136,11 +136,11 @@ marshal(T, V) when
T =:= {response, {repair, success}} orelse
T =:= {response, {repair, failure}}
->
{bin, erlang:term_to_binary(V)}.
{{bin, erlang:term_to_binary(V)}, process_context(T, C)}.
-spec unmarshal(machinery_mg_schema:t(), machinery_msgpack:t()) ->
any().
unmarshal(T, V) when
-spec unmarshal(machinery_mg_schema:t(), machinery_msgpack:t(), machinery_mg_schema:context()) ->
{any(), machinery_mg_schema:context()}.
unmarshal(T, V, C) when
T =:= {aux_state, 1} orelse
T =:= {event, 2} orelse
T =:= {args, init} orelse
@ -150,13 +150,13 @@ unmarshal(T, V) when
T =:= {response, {repair, failure}}
->
{bin, EncodedV} = V,
erlang:binary_to_term(EncodedV);
unmarshal({aux_state, undefined}, {bin, <<>>}) ->
{erlang:binary_to_term(EncodedV), process_context(T, C)};
unmarshal({aux_state, undefined} = T, {bin, <<>>}, C) ->
% initial aux_state
undefined;
unmarshal({response, {repair, success}}, {bin, <<"ok">>}) ->
{undefined, process_context(T, C)};
unmarshal({response, {repair, success}} = T, {bin, <<"ok">>}, C) ->
% mg repair migration artefact
done.
{done, process_context(T, C)}.
-spec get_version(machinery_mg_schema:vt()) ->
machinery_mg_schema:version().
@ -165,6 +165,31 @@ get_version(aux_state) ->
get_version(event) ->
2.
-spec process_context(machinery_mg_schema:t(), machinery_mg_schema:context()) ->
machinery_mg_schema:context() | no_return().
process_context(T, C) ->
?assertMatch(#{machine_ref := _, machine_ns := general}, C),
do_process_context(T, C).
-spec do_process_context(machinery_mg_schema:t(), machinery_mg_schema:context()) ->
machinery_mg_schema:context() | no_return().
do_process_context({response, call}, C) ->
?assertMatch(#{{args, call} := ok}, C),
C;
do_process_context({response, {repair, success}}, C) ->
?assertMatch(#{{args, repair} := ok}, C),
C;
do_process_context({response, {repair, failure}}, C) ->
?assertMatch(#{{args, repair} := ok}, C),
C;
do_process_context({args, _} = T, C) ->
C#{T => ok};
do_process_context({event, _}, C) ->
?assertMatch(#{my_key := test}, C),
C;
do_process_context({aux_state, _}, C) ->
C#{my_key => test}.
%% Helpers
start(ID, Args, C) ->