HG-48: Switch to the new stateproc protocol (#14)

* HG-48: Linting should not require compilation

* HG-48: Switch to the new stateproc protocol + introduce dynamic dispatch

* HG-47: Update TODOs

* HG-48: Fix process_signal contract

* HG-48: Update Payer construction in tests

* HG-48: Fix context handling

* HG-48: Bump damsel dep and update invoice machine accordingly

* HG-48: Bump mg_prototype service dep

* HG-48: Consolidate service specs in one place
This commit is contained in:
Andrew Mayorov 2016-08-11 15:52:58 +00:00 committed by GitHub
parent b78c1af62d
commit d5662dd9dc
16 changed files with 343 additions and 248 deletions

3
.gitmodules vendored
View File

@ -1,7 +1,6 @@
[submodule "apps/hg_proto/damsel"]
path = apps/hg_proto/damsel
url = git@github.com:keynslug/damsel.git
branch = HG-40/fix/events
url = git@github.com:rbkmoney/damsel.git
[submodule "build_utils"]
path = build_utils
url = git@github.com:rbkmoney/build_utils.git

View File

@ -46,7 +46,7 @@ compile: submodules rebar-update
xref: submodules
$(REBAR) xref
lint: compile
lint:
elvis rock
dialyze:

View File

@ -5,11 +5,9 @@
* More familiar flow control handling of machines, e.g. catching and wrapping thrown exceptions.
* Explicit stage denotion in the invoice machine?
* __Submachine abstraction and payment submachine implementation__.
* __Properly pass woody contexts around__.
* __Invoice access control__.
* __Proper behaviours around machines w/ internal datastructures marshalling, event sources and dispatching.__
# Tests
* Fix excess `localhost` definitions (as soon as service discovery strategy will be finalized, hopefully).
* __Add generic albeit more complex test suite which covers as many state transitions with expected effects as possible__.
* Employ macros to minimize pattern matching boilerplate.

View File

@ -35,12 +35,18 @@ stop() ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
MachineHandlers = [
hg_invoice
],
{ok, {
#{strategy => one_for_all, intensity => 6, period => 30},
[get_api_child_spec()]
[
hg_machine:get_child_spec(MachineHandlers),
get_api_child_spec(MachineHandlers)
]
}}.
get_api_child_spec() ->
get_api_child_spec(MachineHandlers) ->
woody_server:child_spec(
?MODULE,
#{
@ -48,16 +54,15 @@ get_api_child_spec() ->
port => genlib_app:env(?MODULE, port, 8022),
net_opts => [],
event_handler => hg_woody_event_handler,
handlers => [
construct_service_handler(eventsink, hg_event_sink, []),
handlers => hg_machine:get_service_handlers(MachineHandlers) ++ [
construct_service_handler(invoicing, hg_invoice, []),
construct_service_handler(processor, hg_machine, [])
construct_service_handler(eventsink, hg_event_sink, [])
]
}
).
construct_service_handler(Name, Module, Opts) ->
{Name, Path, Service} = hg_proto:get_service_spec(Name),
{Path, Service} = hg_proto:get_service_spec(Name),
{Path, {Service, Module, Opts}}.
%% Application callbacks

View File

@ -18,9 +18,8 @@
{{ok, term()}, woody_client:context()} | no_return().
handle_function('GetEvents', {#payproc_EventRange{'after' = After, limit = Limit}}, Context0, _Opts) ->
HistoryRange = #'HistoryRange'{'after' = After, limit = Limit},
try
{History, Context} = get_public_history(HistoryRange, Context0),
{History, Context} = get_public_history(After, Limit, Context0),
{{ok, History}, Context}
catch
{{exception, #'EventNotFound'{}}, Context1} ->
@ -35,31 +34,22 @@ handle_function('GetLastEventID', {}, Context0, _Opts) ->
throw({#payproc_NoLastEvent{}, Context})
end.
get_public_history(HistoryRange = #'HistoryRange'{limit = undefined}, Context0) ->
{History0, Context} = get_history_range(HistoryRange, Context0),
{_LastID, History} = hg_machine:map_history(History0),
{History, Context};
get_public_history(After, Limit, Context) ->
hg_history:get_public_history(
fun get_history_range/3,
fun publish_event/1,
After, Limit,
Context
).
get_public_history(#'HistoryRange'{limit = 0}, Context) ->
{[], Context};
get_public_history(HistoryRange = #'HistoryRange'{limit = N}, Context0) ->
{History0, Context1} = get_history_range(HistoryRange, Context0),
{LastID, History} = hg_machine:map_history(History0),
case length(History0) of
N when length(History) =:= N ->
{History, Context1};
N ->
NextRange = #'HistoryRange'{'after' = LastID, limit = N - length(History)},
{HistoryRest, Context2} = get_public_history(NextRange, Context1),
{History ++ HistoryRest, Context2};
M when M < N ->
{History, Context1}
end.
get_history_range(HistoryRange, Context0) ->
get_history_range(After, Limit, Context0) ->
HistoryRange = #'HistoryRange'{'after' = After, limit = Limit},
{{ok, History}, Context} = call_event_sink('GetHistory', [HistoryRange], Context0),
{History, Context}.
publish_event(#'SinkEvent'{source_ns = Ns, source_id = SourceID, event = Event}) ->
hg_machine:publish_event(Ns, SourceID, Event).
call_event_sink(Function, Args, Context) ->
Url = genlib_app:env(hellgate, eventsink_service_url),
Service = {hg_state_processing_thrift, 'EventSink'},

View File

@ -0,0 +1,50 @@
-module(hg_history).
-export([get_public_history/5]).
%%
-spec get_public_history(GetFun, PublishFun, EventID, N, Context) -> {PublicHistory, Context} when
GetFun :: fun((EventID, N, Context) -> {History, Context}),
PublishFun :: fun((Event) -> {true, EventID, PublicEvent} | {false, EventID}),
EventID :: integer(),
N :: non_neg_integer(),
History :: [Event],
PublicHistory :: [PublicEvent],
Event :: term(),
PublicEvent :: term(),
Context :: woody_client:context().
get_public_history(GetFun, PublishFun, AfterID, undefined, Context0) ->
{History0, Context} = GetFun(AfterID, undefined, Context0),
{_LastID, History} = publish_history(PublishFun, AfterID, History0),
{History, Context};
get_public_history(_GetFun, _PublishFun, _AfterID, 0, Context) ->
{[], Context};
get_public_history(GetFun, PublishFun, AfterID, N, Context0) ->
{History0, Context1} = GetFun(AfterID, N, Context0),
{LastID, History} = publish_history(PublishFun, AfterID, History0),
case length(History0) of
N when length(History) =:= N ->
{History, Context1};
N ->
Left = N - length(History),
{HistoryRest, Context2} = get_public_history(GetFun, PublishFun, LastID, Left, Context1),
{History ++ HistoryRest, Context2};
M when M < N ->
{History, Context1}
end.
publish_history(PublishFun, LastID, History) ->
publish_history(PublishFun, History, LastID, []).
publish_history(_PublishFun, [], LastID, Evs) ->
{LastID, lists:reverse(Evs)};
publish_history(PublishFun, [Ev0 | Rest], _, Evs) ->
case PublishFun(Ev0) of
{true, ID, Ev} ->
publish_history(PublishFun, Rest, ID, [Ev | Evs]);
{false, ID} ->
publish_history(PublishFun, Rest, ID, Evs)
end.

View File

@ -1,6 +1,8 @@
-module(hg_invoice).
-include_lib("hg_proto/include/hg_payment_processing_thrift.hrl").
-define(NS, <<"invoice">>).
%% Woody handler
-behaviour(woody_server_thrift_handler).
@ -11,13 +13,16 @@
-behaviour(hg_machine).
-export([namespace/0]).
-export([init/3]).
-export([process_signal/3]).
-export([process_call/3]).
-export([map_event/1]).
-export([publish_event/2]).
%%
-record(st, {
invoice :: invoice(),
payments = [] :: [payment()],
@ -26,13 +31,14 @@
}).
-type st() :: #st{}.
%%
-spec handle_function(woody_t:func(), woody_server_thrift_handler:args(), woody_client:context(), []) ->
{{ok, term()}, woody_client:context()} | no_return().
handle_function('Create', {UserInfo, InvoiceParams}, Context0, _Opts) ->
{InvoiceID, Context} = hg_machine:start(?MODULE, {InvoiceParams, UserInfo}, opts(Context0)),
{InvoiceID, Context} = start(hg_utils:unique_id(), {InvoiceParams, UserInfo}, Context0),
{{ok, InvoiceID}, Context};
handle_function('Get', {UserInfo, InvoiceID}, Context0, _Opts) ->
@ -45,7 +51,7 @@ handle_function('GetEvents', {UserInfo, InvoiceID, Range}, Context0, _Opts) ->
{{ok, History}, Context};
handle_function('StartPayment', {UserInfo, InvoiceID, PaymentParams}, Context0, _Opts) ->
{PaymentID, Context} = hg_machine:call(InvoiceID, {start_payment, PaymentParams, UserInfo}, opts(Context0)),
{PaymentID, Context} = call(InvoiceID, {start_payment, PaymentParams, UserInfo}, Context0),
{{ok, PaymentID}, Context};
handle_function('GetPayment', {UserInfo, UserInfo, PaymentID}, Context0, _Opts) ->
@ -58,20 +64,20 @@ handle_function('GetPayment', {UserInfo, UserInfo, PaymentID}, Context0, _Opts)
end;
handle_function('Fulfill', {UserInfo, InvoiceID, Reason}, Context0, _Opts) ->
{Result, Context} = hg_machine:call(InvoiceID, {fulfill, Reason, UserInfo}, opts(Context0)),
{Result, Context} = call(InvoiceID, {fulfill, Reason, UserInfo}, Context0),
{{ok, Result}, Context};
handle_function('Rescind', {UserInfo, InvoiceID, Reason}, Context0, _Opts) ->
{Result, Context} = hg_machine:call(InvoiceID, {rescind, Reason, UserInfo}, opts(Context0)),
{Result, Context} = call(InvoiceID, {rescind, Reason, UserInfo}, Context0),
{{ok, Result}, Context}.
%%
get_history(_UserInfo, InvoiceID, Context) ->
hg_machine:get_history(InvoiceID, opts(Context)).
hg_machine:get_history(?NS, InvoiceID, opts(Context)).
get_history(_UserInfo, InvoiceID, AfterID, Limit, Context) ->
hg_machine:get_history(InvoiceID, AfterID, Limit, opts(Context)).
hg_machine:get_history(?NS, InvoiceID, AfterID, Limit, opts(Context)).
get_state(UserInfo, InvoiceID, Context0) ->
{History, Context} = get_history(UserInfo, InvoiceID, Context0),
@ -79,41 +85,18 @@ get_state(UserInfo, InvoiceID, Context0) ->
{St, Context}.
get_public_history(UserInfo, InvoiceID, #payproc_EventRange{'after' = AfterID, limit = Limit}, Context) ->
get_public_history(UserInfo, InvoiceID, AfterID, Limit, Context).
hg_history:get_public_history(
fun (ID, Lim, Ctx) -> get_history(UserInfo, InvoiceID, ID, Lim, Ctx) end,
fun (Event) -> publish_event(InvoiceID, Event) end,
AfterID, Limit,
Context
).
get_public_history(UserInfo, InvoiceID, AfterID, undefined, Context0) ->
{History0, Context} = get_history(UserInfo, InvoiceID, AfterID, undefined, Context0),
{_LastID, History} = map_history(History0),
{History, Context};
start(ID, Args, Context) ->
hg_machine:start(?NS, ID, Args, opts(Context)).
get_public_history(_UserInfo, _InvoiceID, _AfterID, 0, Context) ->
{[], Context};
get_public_history(UserInfo, InvoiceID, AfterID, N, Context0) ->
{History0, Context1} = get_history(UserInfo, InvoiceID, AfterID, N, Context0),
{LastID, History} = map_history(History0),
case length(History0) of
N when length(History) =:= N ->
{History, Context1};
N ->
NextRange = #payproc_EventRange{'after' = LastID, limit = N - length(History)},
{HistoryRest, Context2} = get_public_history(UserInfo, InvoiceID, NextRange, Context1),
{History ++ HistoryRest, Context2};
M when M < N ->
{History, Context1}
end.
map_history(History) ->
map_history(History, undefined, []).
map_history([], LastID, Evs) ->
{LastID, lists:reverse(Evs)};
map_history([Ev0 = {ID, _, _, _} | Rest], _, Evs) ->
case map_event(Ev0) of
Ev when Ev /= undefined ->
map_history(Rest, ID, [Ev | Evs]);
undefined ->
map_history(Rest, ID, Evs)
end.
call(ID, Args, Context) ->
hg_machine:call(?NS, ID, Args, opts(Context)).
opts(Context) ->
#{client_context => Context}.
@ -148,32 +131,39 @@ opts(Context) ->
-define(payment_pending(PaymentID),
#payproc_InvoicePaymentPending{id = PaymentID}).
-spec map_event(hg_machine:event(ev())) ->
hg_payment_processing_thrift:'Event'() | undefined.
-spec publish_event(invoice_id(), hg_machine:event(ev())) ->
{true, hg_machine:event_id(), hg_payment_processing_thrift:'Event'()} |
{false, hg_machine:event_id()}.
map_event({ID, Source, Dt, {public, Seq, Ev}}) ->
#payproc_Event{
id = ID,
publish_event(InvoiceID, {EventID, Dt, {public, Seq, Ev}}) ->
{true, EventID, #payproc_Event{
id = EventID,
created_at = Dt,
source = {invoice, Source},
source = {invoice, InvoiceID},
sequence = Seq,
payload = Ev
};
map_event(_) ->
undefined.
}};
publish_event(_InvoiceID, {EventID, _, _}) ->
{false, EventID}.
%%
-spec init(invoice_id(), {invoice_params(), user_info()}, hg_machine:context()) ->
{{ok, hg_machine:result(ev())}, woody_client:context()}.
-spec namespace() ->
hg_machine:ns().
init(ID, {InvoiceParams, _UserInfo}, Context) ->
Invoice = create_invoice(ID, InvoiceParams),
namespace() ->
?NS.
-spec init(invoice_id(), {invoice_params(), user_info()}, hg_machine:context()) ->
{hg_machine:result(ev()), woody_client:context()}.
init(ID, {InvoiceParams, UserInfo}, Context) ->
Invoice = create_invoice(ID, InvoiceParams, UserInfo),
Event = {public, ?invoice_ev(?invoice_created(Invoice))},
{ok(Event, #st{}, set_invoice_timer(Invoice)), Context}.
-spec process_signal(hg_machine:signal(), hg_machine:history(ev()), hg_machine:context()) ->
{{ok, hg_machine:result(ev())}, woody_client:context()}.
{hg_machine:result(ev()), woody_client:context()}.
process_signal(timeout, History, Context) ->
St = #st{invoice = Invoice, stage = Stage} = collapse_history(History),
@ -186,7 +176,7 @@ process_signal(timeout, History, Context) ->
% invoice is expired
process_expiration(St, Context);
_ ->
ok()
{ok(), Context}
end;
process_signal({repair, _}, History, Context) ->
@ -237,7 +227,7 @@ construct_payment_events(#domain_InvoicePayment{trx = Trx}, Trx = undefined, Eve
ok | {ok, term()} | {exception, term()}.
-spec process_call(call(), hg_machine:history(ev()), woody_client:context()) ->
{{ok, response(), hg_machine:result(ev())}, woody_client:context()}.
{{response(), hg_machine:result(ev())}, woody_client:context()}.
process_call({start_payment, PaymentParams, _UserInfo}, History, Context) ->
St = #st{invoice = Invoice, stage = Stage} = collapse_history(History),
@ -281,19 +271,19 @@ set_invoice_timer(_Invoice) ->
hg_machine_action:new().
ok() ->
{ok, {[], hg_machine_action:new()}}.
{[], hg_machine_action:new()}.
ok(Event, St) ->
ok(Event, St, hg_machine_action:new()).
ok(Event, St, Action) ->
{ok, {sequence_events(wrap_event_list(Event), St), Action}}.
{sequence_events(wrap_event_list(Event), St), Action}.
respond(Response, Event, St, Action) ->
{ok, Response, {sequence_events(wrap_event_list(Event), St), Action}}.
{Response, {sequence_events(wrap_event_list(Event), St), Action}}.
raise(Exception) ->
raise(Exception, hg_machine_action:new()).
raise(Exception, Action) ->
{ok, {exception, Exception}, {[], Action}}.
{{exception, Exception}, {[], Action}}.
wrap_event_list(Event) when is_tuple(Event) ->
wrap_event_list([Event]);
@ -311,10 +301,12 @@ sequence_event_({private, Ev}, Seq) ->
%%
create_invoice(ID, V = #payproc_InvoiceParams{}) ->
create_invoice(ID, V = #payproc_InvoiceParams{}, #payproc_UserInfo{id = UserID}) ->
Revision = hg_domain:head(),
#domain_Invoice{
id = ID,
shop_id = V#payproc_InvoiceParams.shop_id,
owner = #domain_PartyRef{id = UserID, revision = 1},
created_at = get_datetime_utc(),
status = ?unpaid(),
domain_revision = Revision,
@ -328,14 +320,13 @@ create_invoice(ID, V = #payproc_InvoiceParams{}) ->
}
}.
create_payment(V = #payproc_InvoicePaymentParams{}, Invoice) ->
create_payment(V = #payproc_InvoicePaymentParams{}, Invoice = #domain_Invoice{cost = Cost}) ->
#domain_InvoicePayment{
id = create_payment_id(Invoice),
created_at = get_datetime_utc(),
status = ?pending(),
payer = V#payproc_InvoicePaymentParams.payer,
payment_tool = V#payproc_InvoicePaymentParams.payment_tool,
session = V#payproc_InvoicePaymentParams.session
cost = Cost,
payer = V#payproc_InvoicePaymentParams.payer
}.
create_payment_id(Invoice = #domain_Invoice{}) ->
@ -375,7 +366,7 @@ fulfill_invoice(_Reason, Invoice) ->
-spec collapse_history([ev()]) -> st().
collapse_history(History) ->
lists:foldl(fun ({_ID, _, _, Ev}, St) -> merge_history(Ev, St) end, #st{}, History).
lists:foldl(fun ({_ID, _, Ev}, St) -> merge_history(Ev, St) end, #st{}, History).
merge_history({public, Seq, ?invoice_ev(Event)}, St) ->
merge_invoice_event(Event, St#st{sequence = Seq});

View File

@ -1,10 +1,11 @@
-module(hg_machine).
-type id() :: binary().
-type id() :: hg_base_thrift:'ID'().
-type ns() :: hg_base_thrift:'Namespace'().
-type args() :: _.
-type event() :: event(_).
-type event(T) :: {event_id(), id(), timestamp(), T}.
-type event(T) :: {event_id(), timestamp(), T}.
-type event_id() :: hg_base_thrift:'EventID'().
-type timestamp() :: hg_base_thrift:'Timestamp'().
@ -14,30 +15,36 @@
-type result(T) :: {[T], hg_machine_action:t()}.
-type result() :: result(_).
-callback namespace() ->
ns().
-callback init(id(), args(), context()) ->
{{ok, result()}, context()}.
{result(), context()}.
-type signal() ::
timeout | {repair, args()}.
-callback process_signal(signal(), history(), context()) ->
{{ok, result()}, context()}.
{result(), context()}.
-type call() :: _.
-type response() :: ok | {ok, term()} | {exception, term()}.
-callback process_call(call(), history(), context()) ->
{{ok, response(), result()}, context()}.
{{response(), result()}, context()}.
%% TODO feels like not the right place
-callback publish_event(id(), event()) ->
{true , event_id(), PublicEvent :: term()} |
{false , event_id()}.
-type context() :: #{
client_context => woody_client:context()
}.
%% TODO not the right place for sure
-callback map_event(event()) ->
term().
-export_type([id/0]).
-export_type([ns/0]).
-export_type([event_id/0]).
-export_type([event/0]).
-export_type([event/1]).
-export_type([history/0]).
@ -47,16 +54,20 @@
-export_type([result/1]).
-export_type([context/0]).
-export([start/3]).
-export([call/3]).
-export([get_history/2]).
-export([get_history/4]).
-export([start/4]).
-export([call/4]).
-export([get_history/3]).
-export([get_history/5]).
%% TODO not the right place either
-export([map_history/1]).
-export([publish_event/3]).
-export([dispatch_signal/3]).
-export([dispatch_call/3]).
%% Dispatch
-export([get_child_spec/1]).
-export([get_service_handlers/1]).
-export([start_link/1]).
-export([init/1]).
%% Woody handler
@ -74,17 +85,16 @@
%%
-spec start(module(), term(), opts()) -> {id(), woody_client:context()}.
-spec start(ns(), id(), term(), opts()) -> {id(), woody_client:context()}.
start(Module, Args, #{client_context := Context0}) ->
{{ok, Response}, Context} = call_automaton('start', [#'Args'{arg = wrap_args({Module, Args})}], Context0),
#'StartResult'{id = ID} = Response,
start(Ns, ID, Args, #{client_context := Context0}) ->
{ok, Context} = call_automaton('Start', [Ns, ID, wrap_args(Args)], Context0),
{ID, Context}.
-spec call(id(), term(), opts()) -> {term(), woody_client:context()} | no_return().
-spec call(ns(), id(), term(), opts()) -> {term(), woody_client:context()} | no_return().
call(ID, Args, #{client_context := Context0}) ->
case call_automaton('call', [{id, ID}, wrap_args(Args)], Context0) of
call(Ns, ID, Args, #{client_context := Context0}) ->
case call_automaton('Call', [Ns, {id, ID}, wrap_args(Args)], Context0) of
{{ok, Response}, Context} ->
% should be specific to a processing interface already
case unmarshal_term(Response) of
@ -101,31 +111,36 @@ call(ID, Args, #{client_context := Context0}) ->
error(Reason)
end.
-spec get_history(id(), opts()) ->
-spec get_history(ns(), id(), opts()) ->
{history(), woody_client:context()}.
get_history(ID, Opts) ->
get_history(ID, #'HistoryRange'{}, Opts).
get_history(Ns, ID, Opts) ->
get_history(Ns, ID, #'HistoryRange'{}, Opts).
-spec get_history(id(), event_id(), undefined | non_neg_integer(), opts()) ->
-spec get_history(ns(), id(), event_id(), undefined | non_neg_integer(), opts()) ->
{history(), woody_client:context()}.
get_history(ID, AfterID, Limit, Opts) ->
get_history(ID, #'HistoryRange'{'after' = AfterID, limit = Limit}, Opts).
get_history(Ns, ID, AfterID, Limit, Opts) ->
get_history(Ns, ID, #'HistoryRange'{'after' = AfterID, limit = Limit}, Opts).
get_history(ID, Range, #{client_context := Context0}) ->
case call_automaton('getHistory', [{id, ID}, Range], Context0) of
{{ok, []}, Context} ->
{[], Context};
{{ok, History0}, Context} ->
{_Module, History} = untag_history(unwrap_history(History0)),
{History, Context};
get_history(Ns, ID, Range, #{client_context := Context0}) ->
case call_automaton('GetHistory', [Ns, {id, ID}, Range], Context0) of
{{ok, History}, Context} ->
{unwrap_history(History), Context};
{{exception, Exception}, Context} ->
throw({Exception, Context});
{{error, Reason}, _} ->
error(Reason)
end.
-spec publish_event(ns(), id(), hg_state_processing_thrift:'Event'()) ->
{true , event_id(), PublicEvent :: term()} |
{false , event_id()}.
publish_event(Ns, ID, Event) ->
Module = get_handler_module(Ns),
Module:publish_event(ID, unwrap_event(Event)).
%%
call_automaton(Function, Args, Context) ->
@ -138,124 +153,149 @@ call_automaton(Function, Args, Context) ->
-type func() :: 'processSignal' | 'processCall'.
-spec handle_function(func(), woody_server_thrift_handler:args(), woody_client:context(), []) ->
-spec handle_function(func(), woody_server_thrift_handler:args(), woody_client:context(), [ns()]) ->
{{ok, term()}, woody_client:context()} | no_return().
handle_function('processSignal', {Args}, Context0, _Opts) ->
handle_function('processSignal', {Args}, Context0, [Ns]) ->
_ = hg_utils:logtag_process(namespace, Ns),
#'SignalArgs'{signal = {_Type, Signal}, history = History} = Args,
{Result, Context} = dispatch_signal(Signal, History, opts(Context0)),
{Result, Context} = dispatch_signal(Ns, Signal, History, Context0),
{{ok, Result}, Context};
handle_function('processCall', {Args}, Context0, _Opts) ->
handle_function('processCall', {Args}, Context0, [Ns]) ->
_ = hg_utils:logtag_process(namespace, Ns),
#'CallArgs'{call = Payload, history = History} = Args,
{Result, Context} = dispatch_call(Payload, History, opts(Context0)),
{Result, Context} = dispatch_call(Ns, Payload, History, Context0),
{{ok, Result}, Context}.
opts(Context) ->
#{client_context => Context}.
create_context(ClientContext) ->
#{client_context => ClientContext}.
%%
-spec dispatch_signal(Signal, hg_machine:history(), opts()) -> {Result, woody_client:context()} when
Signal ::
hg_state_processing_thrift:'InitSignal'() |
hg_state_processing_thrift:'TimeoutSignal'() |
hg_state_processing_thrift:'RepairSignal'(),
Result ::
hg_state_processing_thrift:'SignalResult'().
-spec dispatch_signal(ns(), Signal, hg_machine:history(), woody_client:context()) ->
{Result, woody_client:context()} when
Signal ::
hg_state_processing_thrift:'InitSignal'() |
hg_state_processing_thrift:'TimeoutSignal'() |
hg_state_processing_thrift:'RepairSignal'(),
Result ::
hg_state_processing_thrift:'SignalResult'().
dispatch_signal(#'InitSignal'{id = ID, arg = Payload}, [], _Opts = #{client_context := Context0}) ->
{Module, Args} = unwrap_args(Payload),
_ = lager:debug("[machine] [~p] dispatch init (~p: ~p) with history: ~p", [Module, ID, Args, []]),
dispatch_signal(Ns, #'InitSignal'{id = ID, arg = Payload}, [], Context0) ->
Args = unwrap_args(Payload),
_ = lager:debug("dispatch init with id = ~s and args = ~p", [ID, Args]),
Module = get_handler_module(Ns),
{Result, #{client_context := Context}} = Module:init(ID, Args, create_context(Context0)),
{marshal_signal_result(Result, Module), Context};
{marshal_signal_result(Result), Context};
dispatch_signal(#'TimeoutSignal'{}, History0, _Opts = #{client_context := Context0}) ->
% TODO: deducing module from signal payload looks more natural
% opaque payload in every event?
{Module, History} = untag_history(unwrap_history(History0)),
_ = lager:debug("[machine] [~p] dispatch timeout with history: ~p", [Module, History]),
dispatch_signal(Ns, #'TimeoutSignal'{}, History0, Context0) ->
History = unwrap_history(History0),
_ = lager:debug("dispatch timeout with history = ~p", [History]),
Module = get_handler_module(Ns),
{Result, #{client_context := Context}} = Module:process_signal(timeout, History, create_context(Context0)),
{marshal_signal_result(Result, Module), Context};
{marshal_signal_result(Result), Context};
dispatch_signal(#'RepairSignal'{arg = Payload}, History0, _Opts = #{client_context := Context0}) ->
Args = unmarshal_term(Payload),
{Module, History} = untag_history(unwrap_history(History0)),
_ = lager:debug("[machine] [~p] dispatch repair (~p) with history: ~p", [Module, Args, History]),
dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, History0, Context0) ->
Args = unwrap_args(Payload),
History = unwrap_history(History0),
_ = lager:debug("dispatch repair with args = ~p and history: ~p", [Args, History]),
Module = get_handler_module(Ns),
{Result, #{client_context := Context}} = Module:process_signal({repair, Args}, History, create_context(Context0)),
{marshal_signal_result(Result, Module), Context}.
{marshal_signal_result(Result), Context}.
marshal_signal_result({ok, {Events, Action}}, Module) ->
_ = lager:debug("[machine] [~p] result with events = ~p and action = ~p", [Module, Events, Action]),
marshal_signal_result({Events, Action}) ->
_ = lager:debug("signal result with events = ~p and action = ~p", [Events, Action]),
#'SignalResult'{
events = wrap_events(Module, Events),
events = wrap_events(Events),
action = Action
}.
-spec dispatch_call(ns(), Call, hg_machine:history(), woody_client:context()) ->
{Result, woody_client:context()} when
Call :: hg_state_processing_thrift:'Call'(),
Result :: hg_state_processing_thrift:'CallResult'().
-spec dispatch_call(Call, hg_machine:history(), opts()) -> {Result, woody_client:context()} when
Call :: hg_state_processing_thrift:'Call'(),
Result :: hg_state_processing_thrift:'CallResult'().
dispatch_call(Payload, History0, _Opts = #{client_context := Context0}) ->
% TODO: looks suspicious
dispatch_call(Ns, Payload, History0, Context0) ->
Args = unwrap_args(Payload),
{Module, History} = untag_history(unwrap_history(History0)),
_ = lager:debug("[machine] [~p] dispatch call (~p) with history: ~p", [Module, Args, History]),
History = unwrap_history(History0),
_ = lager:debug("dispatch call with args = ~p and history: ~p", [Args, History]),
Module = get_handler_module(Ns),
{Result, #{client_context := Context}} = Module:process_call(Args, History, create_context(Context0)),
{marshal_call_result(Result, Module), Context}.
{marshal_call_result(Result), Context}.
%%
marshal_call_result({ok, Response, {Events, Action}}, Module) ->
_ = lager:debug(
"[machine] [~p] call response = ~p with event = ~p and action = ~p",
[Module, Response, Events, Action]
),
marshal_call_result({Response, {Events, Action}}) ->
_ = lager:debug("call response = ~p with event = ~p and action = ~p", [Response, Events, Action]),
#'CallResult'{
events = wrap_events(Module, Events),
events = wrap_events(Events),
action = Action,
response = marshal_term(Response)
}.
create_context(ClientContext) ->
#{client_context => ClientContext}.
%%
-spec map_history(hg_state_processing_thrift:'History'()) ->
{event_id(), [term()]}.
-type service_handler() ::
{Path :: string(), {woody_t:service(), woody_t:handler(), [ns()]}}.
map_history(History) ->
map_history(unwrap_history(History), undefined, []).
-spec get_child_spec([MachineHandler :: module()]) ->
supervisor:child_spec().
map_history([], LastID, Evs) ->
{LastID, lists:reverse(Evs)};
map_history([{Module, Ev0 = {ID, _, _, _}} | Rest], _, Evs) ->
case Module:map_event(Ev0) of
Ev when Ev /= undefined ->
map_history(Rest, ID, [Ev | Evs]);
undefined ->
map_history(Rest, ID, Evs)
end.
get_child_spec(MachineHandlers) ->
#{
id => hg_machine_dispatch,
start => {?MODULE, start_link, [MachineHandlers]},
type => supervisor
}.
-spec get_service_handlers([MachineHandler :: module()]) ->
[service_handler()].
get_service_handlers(MachineHandlers) ->
lists:map(fun get_service_handler/1, MachineHandlers).
get_service_handler(MachineHandler) ->
Ns = MachineHandler:namespace(),
{Path, Service} = hg_proto:get_service_spec(processor, #{namespace => Ns}),
{Path, {Service, ?MODULE, [Ns]}}.
%%
-define(TABLE, hg_machine_dispatch).
-spec start_link([module()]) ->
{ok, pid()}.
start_link(MachineHandlers) ->
supervisor:start_link(?MODULE, MachineHandlers).
-spec init([module()]) ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init(MachineHandlers) ->
_ = ets:new(?TABLE, [protected, named_table, {read_concurrency, true}]),
true = ets:insert_new(?TABLE, [{MH:namespace(), MH} || MH <- MachineHandlers]),
{ok, {#{}, []}}.
%%
-spec get_handler_module(ns()) -> module().
get_handler_module(Ns) ->
ets:lookup_element(?TABLE, Ns, 2).
%%
unwrap_history(History) ->
[unwrap_event(E) || E <- History].
untag_history(History = [{Module, _} | _]) ->
{Module, [E || {_, E} <- History]}.
wrap_events(Events) ->
[wrap_event(E) || E <- Events].
wrap_events(Module, [Ev | Rest]) ->
[wrap_event(Module, Ev) | wrap_events(Module, Rest)];
wrap_events(_, []) ->
[].
wrap_event(Event) ->
marshal_term(Event).
wrap_event(Module, EventInner) ->
marshal_term({Module, EventInner}).
unwrap_event(#'Event'{id = ID, source = Source, created_at = Dt, event_payload = Payload}) ->
{Module, EventInner} = unmarshal_term(Payload),
{Module, {ID, Source, Dt, EventInner}}.
unwrap_event(#'Event'{id = ID, created_at = Dt, event_payload = Payload}) ->
{ID, Dt, unmarshal_term(Payload)}.
wrap_args(Args) ->
marshal_term(Args).

View File

@ -17,8 +17,8 @@
-type tag() :: binary().
-type seconds() :: non_neg_integer().
-type datetime_iso8601() :: binary().
-type datetime() :: calendar:datetime() | datetime_iso8601().
-type datetime_rfc3339() :: binary().
-type datetime() :: calendar:datetime() | datetime_rfc3339().
-type timer() :: hg_base_thrift:'Timer'().
-type t() :: hg_state_processing_thrift:'ComplexAction'().

View File

@ -1,14 +1,23 @@
-module(hg_utils).
-export([unique_id/0]).
-export([shift_datetime/2]).
-export([logtag_process/2]).
-export([get_hostname_ip/1]).
%%
-type seconds() :: integer().
-type datetime_iso8601() :: binary().
-spec unique_id() -> hg_base_thrift:'ID'().
-type dt() :: calendar:datetime() | datetime_iso8601().
unique_id() ->
<<ID:64>> = snowflake:new(),
genlib_format:format_int_base(ID, 62).
%%
-type seconds() :: integer().
-type dt() :: calendar:datetime() | hg_base_thrift:'Timestamp'().
-spec shift_datetime(dt(), seconds()) -> dt().
@ -24,6 +33,14 @@ parse_dt(Dt) ->
%%
-spec logtag_process(atom(), any()) -> ok.
logtag_process(Key, Value) when is_atom(Key) ->
% TODO preformat into binary?
lager:md(orddict:store(Key, Value, lager:md())).
%%
-spec get_hostname_ip(Hostname | IP) -> IP when
Hostname :: string(),
IP :: inet:ip_address().

View File

@ -15,9 +15,9 @@
supervisor:child_spec().
get_child_spec(Host, Port) ->
{Name, Path, Service} = get_service_spec(),
{Path, Service} = get_service_spec(),
woody_server:child_spec(
Name,
?MODULE,
#{
ip => hg_utils:get_hostname_ip(Host),
port => Port,
@ -31,12 +31,11 @@ get_child_spec(Host, Port) ->
woody_t:url().
get_url(Host, Port) ->
{_Name, Path, _Service} = get_service_spec(),
{Path, _Service} = get_service_spec(),
iolist_to_binary(["http://", Host, ":", integer_to_list(Port), Path]).
get_service_spec() ->
{?MODULE, "/test/proxy/provider/dummy",
{hg_proxy_provider_thrift, 'ProviderProxy'}}.
{"/test/proxy/provider/dummy", {hg_proxy_provider_thrift, 'ProviderProxy'}}.
%%

View File

@ -231,6 +231,7 @@ make_invoice_params(Product, Due, Amount, Context) when is_integer(Amount) ->
make_invoice_params(Product, Due, {Amount, <<"RUB">>}, Context);
make_invoice_params(Product, Due, {Amount, Currency}, Context) ->
#payproc_InvoiceParams{
shop_id = <<"THRIFT-SHOP">>,
product = Product,
amount = Amount,
due = format_datetime(Due),
@ -244,9 +245,11 @@ make_payment_params() ->
make_payment_params(PaymentTool, Session) ->
#payproc_InvoicePaymentParams{
payer = #domain_Payer{},
payment_tool = PaymentTool,
session = Session
payer = #domain_Payer{
payment_tool = PaymentTool,
session = Session,
client_info = #domain_ClientInfo{}
}
}.
make_payment_tool() ->

View File

@ -271,7 +271,7 @@ strip_event(#payproc_Event{payload = Payload}) ->
Payload.
issue_service_call(ServiceName, Function, Args, Client = #cl{context = Context, root_url = RootUrl}) ->
{_Name, Path, Service} = hg_proto:get_service_spec(ServiceName),
{Path, Service} = hg_proto:get_service_spec(ServiceName),
Url = iolist_to_binary([RootUrl, Path]),
Request = {Service, Function, Args},
{Result, ContextNext} = woody_client:call_safe(Context, Request, #{url => Url}),

@ -1 +1 @@
Subproject commit d3883df63982bf9bf5c8e9ac477cd65e1238a45b
Subproject commit fb43ca0f55da97cb649c7933251ab54fbb651ffe

View File

@ -1,28 +1,31 @@
-module(hg_proto).
-export([get_service_specs/0]).
-export([get_service_spec/1]).
-export([get_service_spec/2]).
-export_type([service_spec/0]).
%%
-type service_spec() :: {Name :: atom(), Path :: string(), Service :: {module(), atom()}}.
-define(VERSION_PREFIX, "/v1").
-spec get_service_specs() -> [service_spec()].
-type service_spec() :: {Path :: string(), Service :: {module(), atom()}}.
get_service_specs() ->
VersionPrefix = "/v1",
[
{eventsink, VersionPrefix ++ "/processing/eventsink",
{hg_payment_processing_thrift, 'EventSink'}},
{invoicing, VersionPrefix ++ "/processing/invoicing",
{hg_payment_processing_thrift, 'Invoicing'}},
{processor, VersionPrefix ++ "/stateproc/processor",
{hg_state_processing_thrift, 'Processor'}}
].
-spec get_service_spec(Name :: atom()) -> service_spec() | false.
-spec get_service_spec(Name :: atom()) -> service_spec().
get_service_spec(Name) ->
lists:keyfind(Name, 1, get_service_specs()).
get_service_spec(Name, #{}).
-spec get_service_spec(Name :: atom(), Opts :: #{}) -> service_spec().
get_service_spec(eventsink, #{}) ->
Service = {hg_payment_processing_thrift, 'EventSink'},
{?VERSION_PREFIX ++ "/processing/eventsink", Service};
get_service_spec(invoicing, #{}) ->
Service = {hg_payment_processing_thrift, 'Invoicing'},
{?VERSION_PREFIX ++ "/processing/invoicing", Service};
get_service_spec(processor, #{namespace := Ns}) when is_binary(Ns) ->
Service = {hg_state_processing_thrift, 'Processor'},
{?VERSION_PREFIX ++ "/stateproc/" ++ binary_to_list(Ns), Service}.

View File

@ -11,7 +11,7 @@ services:
depends_on:
- machinegun
machinegun:
image: dr.rbkmoney.com/rbkmoney/mg_prototype:87bef0b
image: dr.rbkmoney.com/rbkmoney/mg_prototype:3455e7b
command: /opt/mgun/bin/mgun foreground
networks:
default: