From d5662dd9dc62aac835779283b5fb79f9f2b1d8a8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Aug 2016 15:52:58 +0000 Subject: [PATCH] HG-48: Switch to the new stateproc protocol (#14) * HG-48: Linting should not require compilation * HG-48: Switch to the new stateproc protocol + introduce dynamic dispatch * HG-47: Update TODOs * HG-48: Fix process_signal contract * HG-48: Update Payer construction in tests * HG-48: Fix context handling * HG-48: Bump damsel dep and update invoice machine accordingly * HG-48: Bump mg_prototype service dep * HG-48: Consolidate service specs in one place --- .gitmodules | 3 +- Makefile | 2 +- TODO.md | 4 +- apps/hellgate/src/hellgate.erl | 17 +- apps/hellgate/src/hg_event_sink.erl | 36 ++- apps/hellgate/src/hg_history.erl | 50 +++++ apps/hellgate/src/hg_invoice.erl | 123 +++++------ apps/hellgate/src/hg_machine.erl | 270 +++++++++++++---------- apps/hellgate/src/hg_machine_action.erl | 4 +- apps/hellgate/src/hg_utils.erl | 23 +- apps/hellgate/test/hg_dummy_provider.erl | 9 +- apps/hellgate/test/hg_tests_SUITE.erl | 9 +- apps/hg_client/src/hg_client.erl | 2 +- apps/hg_proto/damsel | 2 +- apps/hg_proto/src/hg_proto.erl | 35 +-- docker-compose.sh | 2 +- 16 files changed, 343 insertions(+), 248 deletions(-) create mode 100644 apps/hellgate/src/hg_history.erl diff --git a/.gitmodules b/.gitmodules index 90f452d..3676b2f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,7 +1,6 @@ [submodule "apps/hg_proto/damsel"] path = apps/hg_proto/damsel - url = git@github.com:keynslug/damsel.git - branch = HG-40/fix/events + url = git@github.com:rbkmoney/damsel.git [submodule "build_utils"] path = build_utils url = git@github.com:rbkmoney/build_utils.git diff --git a/Makefile b/Makefile index 7d780d3..b5dff78 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ compile: submodules rebar-update xref: submodules $(REBAR) xref -lint: compile +lint: elvis rock dialyze: diff --git a/TODO.md b/TODO.md index 882dac8..0d3ffcc 100644 --- a/TODO.md +++ b/TODO.md @@ -5,11 +5,9 @@ * More familiar flow control handling of machines, e.g. catching and wrapping thrown exceptions. * Explicit stage denotion in the invoice machine? * __Submachine abstraction and payment submachine implementation__. -* __Properly pass woody contexts around__. * __Invoice access control__. +* __Proper behaviours around machines w/ internal datastructures marshalling, event sources and dispatching.__ # Tests -* Fix excess `localhost` definitions (as soon as service discovery strategy will be finalized, hopefully). * __Add generic albeit more complex test suite which covers as many state transitions with expected effects as possible__. -* Employ macros to minimize pattern matching boilerplate. diff --git a/apps/hellgate/src/hellgate.erl b/apps/hellgate/src/hellgate.erl index 02761d1..a7442f9 100644 --- a/apps/hellgate/src/hellgate.erl +++ b/apps/hellgate/src/hellgate.erl @@ -35,12 +35,18 @@ stop() -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init([]) -> + MachineHandlers = [ + hg_invoice + ], {ok, { #{strategy => one_for_all, intensity => 6, period => 30}, - [get_api_child_spec()] + [ + hg_machine:get_child_spec(MachineHandlers), + get_api_child_spec(MachineHandlers) + ] }}. -get_api_child_spec() -> +get_api_child_spec(MachineHandlers) -> woody_server:child_spec( ?MODULE, #{ @@ -48,16 +54,15 @@ get_api_child_spec() -> port => genlib_app:env(?MODULE, port, 8022), net_opts => [], event_handler => hg_woody_event_handler, - handlers => [ - construct_service_handler(eventsink, hg_event_sink, []), + handlers => hg_machine:get_service_handlers(MachineHandlers) ++ [ construct_service_handler(invoicing, hg_invoice, []), - construct_service_handler(processor, hg_machine, []) + construct_service_handler(eventsink, hg_event_sink, []) ] } ). construct_service_handler(Name, Module, Opts) -> - {Name, Path, Service} = hg_proto:get_service_spec(Name), + {Path, Service} = hg_proto:get_service_spec(Name), {Path, {Service, Module, Opts}}. %% Application callbacks diff --git a/apps/hellgate/src/hg_event_sink.erl b/apps/hellgate/src/hg_event_sink.erl index 840f12a..5a9c18e 100644 --- a/apps/hellgate/src/hg_event_sink.erl +++ b/apps/hellgate/src/hg_event_sink.erl @@ -18,9 +18,8 @@ {{ok, term()}, woody_client:context()} | no_return(). handle_function('GetEvents', {#payproc_EventRange{'after' = After, limit = Limit}}, Context0, _Opts) -> - HistoryRange = #'HistoryRange'{'after' = After, limit = Limit}, try - {History, Context} = get_public_history(HistoryRange, Context0), + {History, Context} = get_public_history(After, Limit, Context0), {{ok, History}, Context} catch {{exception, #'EventNotFound'{}}, Context1} -> @@ -35,31 +34,22 @@ handle_function('GetLastEventID', {}, Context0, _Opts) -> throw({#payproc_NoLastEvent{}, Context}) end. -get_public_history(HistoryRange = #'HistoryRange'{limit = undefined}, Context0) -> - {History0, Context} = get_history_range(HistoryRange, Context0), - {_LastID, History} = hg_machine:map_history(History0), - {History, Context}; +get_public_history(After, Limit, Context) -> + hg_history:get_public_history( + fun get_history_range/3, + fun publish_event/1, + After, Limit, + Context + ). -get_public_history(#'HistoryRange'{limit = 0}, Context) -> - {[], Context}; -get_public_history(HistoryRange = #'HistoryRange'{limit = N}, Context0) -> - {History0, Context1} = get_history_range(HistoryRange, Context0), - {LastID, History} = hg_machine:map_history(History0), - case length(History0) of - N when length(History) =:= N -> - {History, Context1}; - N -> - NextRange = #'HistoryRange'{'after' = LastID, limit = N - length(History)}, - {HistoryRest, Context2} = get_public_history(NextRange, Context1), - {History ++ HistoryRest, Context2}; - M when M < N -> - {History, Context1} - end. - -get_history_range(HistoryRange, Context0) -> +get_history_range(After, Limit, Context0) -> + HistoryRange = #'HistoryRange'{'after' = After, limit = Limit}, {{ok, History}, Context} = call_event_sink('GetHistory', [HistoryRange], Context0), {History, Context}. +publish_event(#'SinkEvent'{source_ns = Ns, source_id = SourceID, event = Event}) -> + hg_machine:publish_event(Ns, SourceID, Event). + call_event_sink(Function, Args, Context) -> Url = genlib_app:env(hellgate, eventsink_service_url), Service = {hg_state_processing_thrift, 'EventSink'}, diff --git a/apps/hellgate/src/hg_history.erl b/apps/hellgate/src/hg_history.erl new file mode 100644 index 0000000..257219c --- /dev/null +++ b/apps/hellgate/src/hg_history.erl @@ -0,0 +1,50 @@ +-module(hg_history). + +-export([get_public_history/5]). + +%% + +-spec get_public_history(GetFun, PublishFun, EventID, N, Context) -> {PublicHistory, Context} when + GetFun :: fun((EventID, N, Context) -> {History, Context}), + PublishFun :: fun((Event) -> {true, EventID, PublicEvent} | {false, EventID}), + EventID :: integer(), + N :: non_neg_integer(), + History :: [Event], + PublicHistory :: [PublicEvent], + Event :: term(), + PublicEvent :: term(), + Context :: woody_client:context(). + +get_public_history(GetFun, PublishFun, AfterID, undefined, Context0) -> + {History0, Context} = GetFun(AfterID, undefined, Context0), + {_LastID, History} = publish_history(PublishFun, AfterID, History0), + {History, Context}; + +get_public_history(_GetFun, _PublishFun, _AfterID, 0, Context) -> + {[], Context}; +get_public_history(GetFun, PublishFun, AfterID, N, Context0) -> + {History0, Context1} = GetFun(AfterID, N, Context0), + {LastID, History} = publish_history(PublishFun, AfterID, History0), + case length(History0) of + N when length(History) =:= N -> + {History, Context1}; + N -> + Left = N - length(History), + {HistoryRest, Context2} = get_public_history(GetFun, PublishFun, LastID, Left, Context1), + {History ++ HistoryRest, Context2}; + M when M < N -> + {History, Context1} + end. + +publish_history(PublishFun, LastID, History) -> + publish_history(PublishFun, History, LastID, []). + +publish_history(_PublishFun, [], LastID, Evs) -> + {LastID, lists:reverse(Evs)}; +publish_history(PublishFun, [Ev0 | Rest], _, Evs) -> + case PublishFun(Ev0) of + {true, ID, Ev} -> + publish_history(PublishFun, Rest, ID, [Ev | Evs]); + {false, ID} -> + publish_history(PublishFun, Rest, ID, Evs) + end. diff --git a/apps/hellgate/src/hg_invoice.erl b/apps/hellgate/src/hg_invoice.erl index 2749197..1daf4b1 100644 --- a/apps/hellgate/src/hg_invoice.erl +++ b/apps/hellgate/src/hg_invoice.erl @@ -1,6 +1,8 @@ -module(hg_invoice). -include_lib("hg_proto/include/hg_payment_processing_thrift.hrl"). +-define(NS, <<"invoice">>). + %% Woody handler -behaviour(woody_server_thrift_handler). @@ -11,13 +13,16 @@ -behaviour(hg_machine). +-export([namespace/0]). + -export([init/3]). -export([process_signal/3]). -export([process_call/3]). --export([map_event/1]). +-export([publish_event/2]). %% + -record(st, { invoice :: invoice(), payments = [] :: [payment()], @@ -26,13 +31,14 @@ }). -type st() :: #st{}. + %% -spec handle_function(woody_t:func(), woody_server_thrift_handler:args(), woody_client:context(), []) -> {{ok, term()}, woody_client:context()} | no_return(). handle_function('Create', {UserInfo, InvoiceParams}, Context0, _Opts) -> - {InvoiceID, Context} = hg_machine:start(?MODULE, {InvoiceParams, UserInfo}, opts(Context0)), + {InvoiceID, Context} = start(hg_utils:unique_id(), {InvoiceParams, UserInfo}, Context0), {{ok, InvoiceID}, Context}; handle_function('Get', {UserInfo, InvoiceID}, Context0, _Opts) -> @@ -45,7 +51,7 @@ handle_function('GetEvents', {UserInfo, InvoiceID, Range}, Context0, _Opts) -> {{ok, History}, Context}; handle_function('StartPayment', {UserInfo, InvoiceID, PaymentParams}, Context0, _Opts) -> - {PaymentID, Context} = hg_machine:call(InvoiceID, {start_payment, PaymentParams, UserInfo}, opts(Context0)), + {PaymentID, Context} = call(InvoiceID, {start_payment, PaymentParams, UserInfo}, Context0), {{ok, PaymentID}, Context}; handle_function('GetPayment', {UserInfo, UserInfo, PaymentID}, Context0, _Opts) -> @@ -58,20 +64,20 @@ handle_function('GetPayment', {UserInfo, UserInfo, PaymentID}, Context0, _Opts) end; handle_function('Fulfill', {UserInfo, InvoiceID, Reason}, Context0, _Opts) -> - {Result, Context} = hg_machine:call(InvoiceID, {fulfill, Reason, UserInfo}, opts(Context0)), + {Result, Context} = call(InvoiceID, {fulfill, Reason, UserInfo}, Context0), {{ok, Result}, Context}; handle_function('Rescind', {UserInfo, InvoiceID, Reason}, Context0, _Opts) -> - {Result, Context} = hg_machine:call(InvoiceID, {rescind, Reason, UserInfo}, opts(Context0)), + {Result, Context} = call(InvoiceID, {rescind, Reason, UserInfo}, Context0), {{ok, Result}, Context}. %% get_history(_UserInfo, InvoiceID, Context) -> - hg_machine:get_history(InvoiceID, opts(Context)). + hg_machine:get_history(?NS, InvoiceID, opts(Context)). get_history(_UserInfo, InvoiceID, AfterID, Limit, Context) -> - hg_machine:get_history(InvoiceID, AfterID, Limit, opts(Context)). + hg_machine:get_history(?NS, InvoiceID, AfterID, Limit, opts(Context)). get_state(UserInfo, InvoiceID, Context0) -> {History, Context} = get_history(UserInfo, InvoiceID, Context0), @@ -79,41 +85,18 @@ get_state(UserInfo, InvoiceID, Context0) -> {St, Context}. get_public_history(UserInfo, InvoiceID, #payproc_EventRange{'after' = AfterID, limit = Limit}, Context) -> - get_public_history(UserInfo, InvoiceID, AfterID, Limit, Context). + hg_history:get_public_history( + fun (ID, Lim, Ctx) -> get_history(UserInfo, InvoiceID, ID, Lim, Ctx) end, + fun (Event) -> publish_event(InvoiceID, Event) end, + AfterID, Limit, + Context + ). -get_public_history(UserInfo, InvoiceID, AfterID, undefined, Context0) -> - {History0, Context} = get_history(UserInfo, InvoiceID, AfterID, undefined, Context0), - {_LastID, History} = map_history(History0), - {History, Context}; +start(ID, Args, Context) -> + hg_machine:start(?NS, ID, Args, opts(Context)). -get_public_history(_UserInfo, _InvoiceID, _AfterID, 0, Context) -> - {[], Context}; -get_public_history(UserInfo, InvoiceID, AfterID, N, Context0) -> - {History0, Context1} = get_history(UserInfo, InvoiceID, AfterID, N, Context0), - {LastID, History} = map_history(History0), - case length(History0) of - N when length(History) =:= N -> - {History, Context1}; - N -> - NextRange = #payproc_EventRange{'after' = LastID, limit = N - length(History)}, - {HistoryRest, Context2} = get_public_history(UserInfo, InvoiceID, NextRange, Context1), - {History ++ HistoryRest, Context2}; - M when M < N -> - {History, Context1} - end. - -map_history(History) -> - map_history(History, undefined, []). - -map_history([], LastID, Evs) -> - {LastID, lists:reverse(Evs)}; -map_history([Ev0 = {ID, _, _, _} | Rest], _, Evs) -> - case map_event(Ev0) of - Ev when Ev /= undefined -> - map_history(Rest, ID, [Ev | Evs]); - undefined -> - map_history(Rest, ID, Evs) - end. +call(ID, Args, Context) -> + hg_machine:call(?NS, ID, Args, opts(Context)). opts(Context) -> #{client_context => Context}. @@ -148,32 +131,39 @@ opts(Context) -> -define(payment_pending(PaymentID), #payproc_InvoicePaymentPending{id = PaymentID}). --spec map_event(hg_machine:event(ev())) -> - hg_payment_processing_thrift:'Event'() | undefined. +-spec publish_event(invoice_id(), hg_machine:event(ev())) -> + {true, hg_machine:event_id(), hg_payment_processing_thrift:'Event'()} | + {false, hg_machine:event_id()}. -map_event({ID, Source, Dt, {public, Seq, Ev}}) -> - #payproc_Event{ - id = ID, +publish_event(InvoiceID, {EventID, Dt, {public, Seq, Ev}}) -> + {true, EventID, #payproc_Event{ + id = EventID, created_at = Dt, - source = {invoice, Source}, + source = {invoice, InvoiceID}, sequence = Seq, payload = Ev - }; -map_event(_) -> - undefined. + }}; +publish_event(_InvoiceID, {EventID, _, _}) -> + {false, EventID}. %% --spec init(invoice_id(), {invoice_params(), user_info()}, hg_machine:context()) -> - {{ok, hg_machine:result(ev())}, woody_client:context()}. +-spec namespace() -> + hg_machine:ns(). -init(ID, {InvoiceParams, _UserInfo}, Context) -> - Invoice = create_invoice(ID, InvoiceParams), +namespace() -> + ?NS. + +-spec init(invoice_id(), {invoice_params(), user_info()}, hg_machine:context()) -> + {hg_machine:result(ev()), woody_client:context()}. + +init(ID, {InvoiceParams, UserInfo}, Context) -> + Invoice = create_invoice(ID, InvoiceParams, UserInfo), Event = {public, ?invoice_ev(?invoice_created(Invoice))}, {ok(Event, #st{}, set_invoice_timer(Invoice)), Context}. -spec process_signal(hg_machine:signal(), hg_machine:history(ev()), hg_machine:context()) -> - {{ok, hg_machine:result(ev())}, woody_client:context()}. + {hg_machine:result(ev()), woody_client:context()}. process_signal(timeout, History, Context) -> St = #st{invoice = Invoice, stage = Stage} = collapse_history(History), @@ -186,7 +176,7 @@ process_signal(timeout, History, Context) -> % invoice is expired process_expiration(St, Context); _ -> - ok() + {ok(), Context} end; process_signal({repair, _}, History, Context) -> @@ -237,7 +227,7 @@ construct_payment_events(#domain_InvoicePayment{trx = Trx}, Trx = undefined, Eve ok | {ok, term()} | {exception, term()}. -spec process_call(call(), hg_machine:history(ev()), woody_client:context()) -> - {{ok, response(), hg_machine:result(ev())}, woody_client:context()}. + {{response(), hg_machine:result(ev())}, woody_client:context()}. process_call({start_payment, PaymentParams, _UserInfo}, History, Context) -> St = #st{invoice = Invoice, stage = Stage} = collapse_history(History), @@ -281,19 +271,19 @@ set_invoice_timer(_Invoice) -> hg_machine_action:new(). ok() -> - {ok, {[], hg_machine_action:new()}}. + {[], hg_machine_action:new()}. ok(Event, St) -> ok(Event, St, hg_machine_action:new()). ok(Event, St, Action) -> - {ok, {sequence_events(wrap_event_list(Event), St), Action}}. + {sequence_events(wrap_event_list(Event), St), Action}. respond(Response, Event, St, Action) -> - {ok, Response, {sequence_events(wrap_event_list(Event), St), Action}}. + {Response, {sequence_events(wrap_event_list(Event), St), Action}}. raise(Exception) -> raise(Exception, hg_machine_action:new()). raise(Exception, Action) -> - {ok, {exception, Exception}, {[], Action}}. + {{exception, Exception}, {[], Action}}. wrap_event_list(Event) when is_tuple(Event) -> wrap_event_list([Event]); @@ -311,10 +301,12 @@ sequence_event_({private, Ev}, Seq) -> %% -create_invoice(ID, V = #payproc_InvoiceParams{}) -> +create_invoice(ID, V = #payproc_InvoiceParams{}, #payproc_UserInfo{id = UserID}) -> Revision = hg_domain:head(), #domain_Invoice{ id = ID, + shop_id = V#payproc_InvoiceParams.shop_id, + owner = #domain_PartyRef{id = UserID, revision = 1}, created_at = get_datetime_utc(), status = ?unpaid(), domain_revision = Revision, @@ -328,14 +320,13 @@ create_invoice(ID, V = #payproc_InvoiceParams{}) -> } }. -create_payment(V = #payproc_InvoicePaymentParams{}, Invoice) -> +create_payment(V = #payproc_InvoicePaymentParams{}, Invoice = #domain_Invoice{cost = Cost}) -> #domain_InvoicePayment{ id = create_payment_id(Invoice), created_at = get_datetime_utc(), status = ?pending(), - payer = V#payproc_InvoicePaymentParams.payer, - payment_tool = V#payproc_InvoicePaymentParams.payment_tool, - session = V#payproc_InvoicePaymentParams.session + cost = Cost, + payer = V#payproc_InvoicePaymentParams.payer }. create_payment_id(Invoice = #domain_Invoice{}) -> @@ -375,7 +366,7 @@ fulfill_invoice(_Reason, Invoice) -> -spec collapse_history([ev()]) -> st(). collapse_history(History) -> - lists:foldl(fun ({_ID, _, _, Ev}, St) -> merge_history(Ev, St) end, #st{}, History). + lists:foldl(fun ({_ID, _, Ev}, St) -> merge_history(Ev, St) end, #st{}, History). merge_history({public, Seq, ?invoice_ev(Event)}, St) -> merge_invoice_event(Event, St#st{sequence = Seq}); diff --git a/apps/hellgate/src/hg_machine.erl b/apps/hellgate/src/hg_machine.erl index 1c6f401..0a4e2cb 100644 --- a/apps/hellgate/src/hg_machine.erl +++ b/apps/hellgate/src/hg_machine.erl @@ -1,10 +1,11 @@ -module(hg_machine). --type id() :: binary(). +-type id() :: hg_base_thrift:'ID'(). +-type ns() :: hg_base_thrift:'Namespace'(). -type args() :: _. -type event() :: event(_). --type event(T) :: {event_id(), id(), timestamp(), T}. +-type event(T) :: {event_id(), timestamp(), T}. -type event_id() :: hg_base_thrift:'EventID'(). -type timestamp() :: hg_base_thrift:'Timestamp'(). @@ -14,30 +15,36 @@ -type result(T) :: {[T], hg_machine_action:t()}. -type result() :: result(_). +-callback namespace() -> + ns(). + -callback init(id(), args(), context()) -> - {{ok, result()}, context()}. + {result(), context()}. -type signal() :: timeout | {repair, args()}. -callback process_signal(signal(), history(), context()) -> - {{ok, result()}, context()}. + {result(), context()}. -type call() :: _. -type response() :: ok | {ok, term()} | {exception, term()}. -callback process_call(call(), history(), context()) -> - {{ok, response(), result()}, context()}. + {{response(), result()}, context()}. + +%% TODO feels like not the right place +-callback publish_event(id(), event()) -> + {true , event_id(), PublicEvent :: term()} | + {false , event_id()}. -type context() :: #{ client_context => woody_client:context() }. -%% TODO not the right place for sure --callback map_event(event()) -> - term(). - -export_type([id/0]). +-export_type([ns/0]). +-export_type([event_id/0]). -export_type([event/0]). -export_type([event/1]). -export_type([history/0]). @@ -47,16 +54,20 @@ -export_type([result/1]). -export_type([context/0]). --export([start/3]). --export([call/3]). --export([get_history/2]). --export([get_history/4]). +-export([start/4]). +-export([call/4]). +-export([get_history/3]). +-export([get_history/5]). -%% TODO not the right place either --export([map_history/1]). +-export([publish_event/3]). --export([dispatch_signal/3]). --export([dispatch_call/3]). +%% Dispatch + +-export([get_child_spec/1]). +-export([get_service_handlers/1]). + +-export([start_link/1]). +-export([init/1]). %% Woody handler @@ -74,17 +85,16 @@ %% --spec start(module(), term(), opts()) -> {id(), woody_client:context()}. +-spec start(ns(), id(), term(), opts()) -> {id(), woody_client:context()}. -start(Module, Args, #{client_context := Context0}) -> - {{ok, Response}, Context} = call_automaton('start', [#'Args'{arg = wrap_args({Module, Args})}], Context0), - #'StartResult'{id = ID} = Response, +start(Ns, ID, Args, #{client_context := Context0}) -> + {ok, Context} = call_automaton('Start', [Ns, ID, wrap_args(Args)], Context0), {ID, Context}. --spec call(id(), term(), opts()) -> {term(), woody_client:context()} | no_return(). +-spec call(ns(), id(), term(), opts()) -> {term(), woody_client:context()} | no_return(). -call(ID, Args, #{client_context := Context0}) -> - case call_automaton('call', [{id, ID}, wrap_args(Args)], Context0) of +call(Ns, ID, Args, #{client_context := Context0}) -> + case call_automaton('Call', [Ns, {id, ID}, wrap_args(Args)], Context0) of {{ok, Response}, Context} -> % should be specific to a processing interface already case unmarshal_term(Response) of @@ -101,31 +111,36 @@ call(ID, Args, #{client_context := Context0}) -> error(Reason) end. --spec get_history(id(), opts()) -> +-spec get_history(ns(), id(), opts()) -> {history(), woody_client:context()}. -get_history(ID, Opts) -> - get_history(ID, #'HistoryRange'{}, Opts). +get_history(Ns, ID, Opts) -> + get_history(Ns, ID, #'HistoryRange'{}, Opts). --spec get_history(id(), event_id(), undefined | non_neg_integer(), opts()) -> +-spec get_history(ns(), id(), event_id(), undefined | non_neg_integer(), opts()) -> {history(), woody_client:context()}. -get_history(ID, AfterID, Limit, Opts) -> - get_history(ID, #'HistoryRange'{'after' = AfterID, limit = Limit}, Opts). +get_history(Ns, ID, AfterID, Limit, Opts) -> + get_history(Ns, ID, #'HistoryRange'{'after' = AfterID, limit = Limit}, Opts). -get_history(ID, Range, #{client_context := Context0}) -> - case call_automaton('getHistory', [{id, ID}, Range], Context0) of - {{ok, []}, Context} -> - {[], Context}; - {{ok, History0}, Context} -> - {_Module, History} = untag_history(unwrap_history(History0)), - {History, Context}; +get_history(Ns, ID, Range, #{client_context := Context0}) -> + case call_automaton('GetHistory', [Ns, {id, ID}, Range], Context0) of + {{ok, History}, Context} -> + {unwrap_history(History), Context}; {{exception, Exception}, Context} -> throw({Exception, Context}); {{error, Reason}, _} -> error(Reason) end. +-spec publish_event(ns(), id(), hg_state_processing_thrift:'Event'()) -> + {true , event_id(), PublicEvent :: term()} | + {false , event_id()}. + +publish_event(Ns, ID, Event) -> + Module = get_handler_module(Ns), + Module:publish_event(ID, unwrap_event(Event)). + %% call_automaton(Function, Args, Context) -> @@ -138,124 +153,149 @@ call_automaton(Function, Args, Context) -> -type func() :: 'processSignal' | 'processCall'. --spec handle_function(func(), woody_server_thrift_handler:args(), woody_client:context(), []) -> +-spec handle_function(func(), woody_server_thrift_handler:args(), woody_client:context(), [ns()]) -> {{ok, term()}, woody_client:context()} | no_return(). -handle_function('processSignal', {Args}, Context0, _Opts) -> +handle_function('processSignal', {Args}, Context0, [Ns]) -> + _ = hg_utils:logtag_process(namespace, Ns), #'SignalArgs'{signal = {_Type, Signal}, history = History} = Args, - {Result, Context} = dispatch_signal(Signal, History, opts(Context0)), + {Result, Context} = dispatch_signal(Ns, Signal, History, Context0), {{ok, Result}, Context}; -handle_function('processCall', {Args}, Context0, _Opts) -> +handle_function('processCall', {Args}, Context0, [Ns]) -> + _ = hg_utils:logtag_process(namespace, Ns), #'CallArgs'{call = Payload, history = History} = Args, - {Result, Context} = dispatch_call(Payload, History, opts(Context0)), + {Result, Context} = dispatch_call(Ns, Payload, History, Context0), {{ok, Result}, Context}. -opts(Context) -> - #{client_context => Context}. - - -create_context(ClientContext) -> - #{client_context => ClientContext}. %% --spec dispatch_signal(Signal, hg_machine:history(), opts()) -> {Result, woody_client:context()} when - Signal :: - hg_state_processing_thrift:'InitSignal'() | - hg_state_processing_thrift:'TimeoutSignal'() | - hg_state_processing_thrift:'RepairSignal'(), - Result :: - hg_state_processing_thrift:'SignalResult'(). +-spec dispatch_signal(ns(), Signal, hg_machine:history(), woody_client:context()) -> + {Result, woody_client:context()} when + Signal :: + hg_state_processing_thrift:'InitSignal'() | + hg_state_processing_thrift:'TimeoutSignal'() | + hg_state_processing_thrift:'RepairSignal'(), + Result :: + hg_state_processing_thrift:'SignalResult'(). -dispatch_signal(#'InitSignal'{id = ID, arg = Payload}, [], _Opts = #{client_context := Context0}) -> - {Module, Args} = unwrap_args(Payload), - _ = lager:debug("[machine] [~p] dispatch init (~p: ~p) with history: ~p", [Module, ID, Args, []]), +dispatch_signal(Ns, #'InitSignal'{id = ID, arg = Payload}, [], Context0) -> + Args = unwrap_args(Payload), + _ = lager:debug("dispatch init with id = ~s and args = ~p", [ID, Args]), + Module = get_handler_module(Ns), {Result, #{client_context := Context}} = Module:init(ID, Args, create_context(Context0)), - {marshal_signal_result(Result, Module), Context}; + {marshal_signal_result(Result), Context}; -dispatch_signal(#'TimeoutSignal'{}, History0, _Opts = #{client_context := Context0}) -> - % TODO: deducing module from signal payload looks more natural - % opaque payload in every event? - {Module, History} = untag_history(unwrap_history(History0)), - _ = lager:debug("[machine] [~p] dispatch timeout with history: ~p", [Module, History]), +dispatch_signal(Ns, #'TimeoutSignal'{}, History0, Context0) -> + History = unwrap_history(History0), + _ = lager:debug("dispatch timeout with history = ~p", [History]), + Module = get_handler_module(Ns), {Result, #{client_context := Context}} = Module:process_signal(timeout, History, create_context(Context0)), - {marshal_signal_result(Result, Module), Context}; + {marshal_signal_result(Result), Context}; -dispatch_signal(#'RepairSignal'{arg = Payload}, History0, _Opts = #{client_context := Context0}) -> - Args = unmarshal_term(Payload), - {Module, History} = untag_history(unwrap_history(History0)), - _ = lager:debug("[machine] [~p] dispatch repair (~p) with history: ~p", [Module, Args, History]), +dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, History0, Context0) -> + Args = unwrap_args(Payload), + History = unwrap_history(History0), + _ = lager:debug("dispatch repair with args = ~p and history: ~p", [Args, History]), + Module = get_handler_module(Ns), {Result, #{client_context := Context}} = Module:process_signal({repair, Args}, History, create_context(Context0)), - {marshal_signal_result(Result, Module), Context}. + {marshal_signal_result(Result), Context}. -marshal_signal_result({ok, {Events, Action}}, Module) -> - _ = lager:debug("[machine] [~p] result with events = ~p and action = ~p", [Module, Events, Action]), +marshal_signal_result({Events, Action}) -> + _ = lager:debug("signal result with events = ~p and action = ~p", [Events, Action]), #'SignalResult'{ - events = wrap_events(Module, Events), + events = wrap_events(Events), action = Action }. +-spec dispatch_call(ns(), Call, hg_machine:history(), woody_client:context()) -> + {Result, woody_client:context()} when + Call :: hg_state_processing_thrift:'Call'(), + Result :: hg_state_processing_thrift:'CallResult'(). --spec dispatch_call(Call, hg_machine:history(), opts()) -> {Result, woody_client:context()} when - Call :: hg_state_processing_thrift:'Call'(), - Result :: hg_state_processing_thrift:'CallResult'(). - -dispatch_call(Payload, History0, _Opts = #{client_context := Context0}) -> - % TODO: looks suspicious +dispatch_call(Ns, Payload, History0, Context0) -> Args = unwrap_args(Payload), - {Module, History} = untag_history(unwrap_history(History0)), - _ = lager:debug("[machine] [~p] dispatch call (~p) with history: ~p", [Module, Args, History]), + History = unwrap_history(History0), + _ = lager:debug("dispatch call with args = ~p and history: ~p", [Args, History]), + Module = get_handler_module(Ns), {Result, #{client_context := Context}} = Module:process_call(Args, History, create_context(Context0)), - {marshal_call_result(Result, Module), Context}. + {marshal_call_result(Result), Context}. -%% - -marshal_call_result({ok, Response, {Events, Action}}, Module) -> - _ = lager:debug( - "[machine] [~p] call response = ~p with event = ~p and action = ~p", - [Module, Response, Events, Action] - ), +marshal_call_result({Response, {Events, Action}}) -> + _ = lager:debug("call response = ~p with event = ~p and action = ~p", [Response, Events, Action]), #'CallResult'{ - events = wrap_events(Module, Events), + events = wrap_events(Events), action = Action, response = marshal_term(Response) }. +create_context(ClientContext) -> + #{client_context => ClientContext}. + %% --spec map_history(hg_state_processing_thrift:'History'()) -> - {event_id(), [term()]}. +-type service_handler() :: + {Path :: string(), {woody_t:service(), woody_t:handler(), [ns()]}}. -map_history(History) -> - map_history(unwrap_history(History), undefined, []). +-spec get_child_spec([MachineHandler :: module()]) -> + supervisor:child_spec(). -map_history([], LastID, Evs) -> - {LastID, lists:reverse(Evs)}; -map_history([{Module, Ev0 = {ID, _, _, _}} | Rest], _, Evs) -> - case Module:map_event(Ev0) of - Ev when Ev /= undefined -> - map_history(Rest, ID, [Ev | Evs]); - undefined -> - map_history(Rest, ID, Evs) - end. +get_child_spec(MachineHandlers) -> + #{ + id => hg_machine_dispatch, + start => {?MODULE, start_link, [MachineHandlers]}, + type => supervisor + }. + +-spec get_service_handlers([MachineHandler :: module()]) -> + [service_handler()]. + +get_service_handlers(MachineHandlers) -> + lists:map(fun get_service_handler/1, MachineHandlers). + +get_service_handler(MachineHandler) -> + Ns = MachineHandler:namespace(), + {Path, Service} = hg_proto:get_service_spec(processor, #{namespace => Ns}), + {Path, {Service, ?MODULE, [Ns]}}. + +%% + +-define(TABLE, hg_machine_dispatch). + +-spec start_link([module()]) -> + {ok, pid()}. + +start_link(MachineHandlers) -> + supervisor:start_link(?MODULE, MachineHandlers). + +-spec init([module()]) -> + {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. + +init(MachineHandlers) -> + _ = ets:new(?TABLE, [protected, named_table, {read_concurrency, true}]), + true = ets:insert_new(?TABLE, [{MH:namespace(), MH} || MH <- MachineHandlers]), + {ok, {#{}, []}}. + +%% + +-spec get_handler_module(ns()) -> module(). + +get_handler_module(Ns) -> + ets:lookup_element(?TABLE, Ns, 2). + +%% unwrap_history(History) -> [unwrap_event(E) || E <- History]. -untag_history(History = [{Module, _} | _]) -> - {Module, [E || {_, E} <- History]}. +wrap_events(Events) -> + [wrap_event(E) || E <- Events]. -wrap_events(Module, [Ev | Rest]) -> - [wrap_event(Module, Ev) | wrap_events(Module, Rest)]; -wrap_events(_, []) -> - []. +wrap_event(Event) -> + marshal_term(Event). -wrap_event(Module, EventInner) -> - marshal_term({Module, EventInner}). - -unwrap_event(#'Event'{id = ID, source = Source, created_at = Dt, event_payload = Payload}) -> - {Module, EventInner} = unmarshal_term(Payload), - {Module, {ID, Source, Dt, EventInner}}. +unwrap_event(#'Event'{id = ID, created_at = Dt, event_payload = Payload}) -> + {ID, Dt, unmarshal_term(Payload)}. wrap_args(Args) -> marshal_term(Args). diff --git a/apps/hellgate/src/hg_machine_action.erl b/apps/hellgate/src/hg_machine_action.erl index 4e73070..de0da81 100644 --- a/apps/hellgate/src/hg_machine_action.erl +++ b/apps/hellgate/src/hg_machine_action.erl @@ -17,8 +17,8 @@ -type tag() :: binary(). -type seconds() :: non_neg_integer(). --type datetime_iso8601() :: binary(). --type datetime() :: calendar:datetime() | datetime_iso8601(). +-type datetime_rfc3339() :: binary(). +-type datetime() :: calendar:datetime() | datetime_rfc3339(). -type timer() :: hg_base_thrift:'Timer'(). -type t() :: hg_state_processing_thrift:'ComplexAction'(). diff --git a/apps/hellgate/src/hg_utils.erl b/apps/hellgate/src/hg_utils.erl index a108950..a336a8a 100644 --- a/apps/hellgate/src/hg_utils.erl +++ b/apps/hellgate/src/hg_utils.erl @@ -1,14 +1,23 @@ -module(hg_utils). +-export([unique_id/0]). -export([shift_datetime/2]). +-export([logtag_process/2]). -export([get_hostname_ip/1]). %% --type seconds() :: integer(). --type datetime_iso8601() :: binary(). +-spec unique_id() -> hg_base_thrift:'ID'(). --type dt() :: calendar:datetime() | datetime_iso8601(). +unique_id() -> + <> = snowflake:new(), + genlib_format:format_int_base(ID, 62). + +%% + +-type seconds() :: integer(). + +-type dt() :: calendar:datetime() | hg_base_thrift:'Timestamp'(). -spec shift_datetime(dt(), seconds()) -> dt(). @@ -24,6 +33,14 @@ parse_dt(Dt) -> %% +-spec logtag_process(atom(), any()) -> ok. + +logtag_process(Key, Value) when is_atom(Key) -> + % TODO preformat into binary? + lager:md(orddict:store(Key, Value, lager:md())). + +%% + -spec get_hostname_ip(Hostname | IP) -> IP when Hostname :: string(), IP :: inet:ip_address(). diff --git a/apps/hellgate/test/hg_dummy_provider.erl b/apps/hellgate/test/hg_dummy_provider.erl index 19cf041..517ad7d 100644 --- a/apps/hellgate/test/hg_dummy_provider.erl +++ b/apps/hellgate/test/hg_dummy_provider.erl @@ -15,9 +15,9 @@ supervisor:child_spec(). get_child_spec(Host, Port) -> - {Name, Path, Service} = get_service_spec(), + {Path, Service} = get_service_spec(), woody_server:child_spec( - Name, + ?MODULE, #{ ip => hg_utils:get_hostname_ip(Host), port => Port, @@ -31,12 +31,11 @@ get_child_spec(Host, Port) -> woody_t:url(). get_url(Host, Port) -> - {_Name, Path, _Service} = get_service_spec(), + {Path, _Service} = get_service_spec(), iolist_to_binary(["http://", Host, ":", integer_to_list(Port), Path]). get_service_spec() -> - {?MODULE, "/test/proxy/provider/dummy", - {hg_proxy_provider_thrift, 'ProviderProxy'}}. + {"/test/proxy/provider/dummy", {hg_proxy_provider_thrift, 'ProviderProxy'}}. %% diff --git a/apps/hellgate/test/hg_tests_SUITE.erl b/apps/hellgate/test/hg_tests_SUITE.erl index 0ce56d9..e7980ef 100644 --- a/apps/hellgate/test/hg_tests_SUITE.erl +++ b/apps/hellgate/test/hg_tests_SUITE.erl @@ -231,6 +231,7 @@ make_invoice_params(Product, Due, Amount, Context) when is_integer(Amount) -> make_invoice_params(Product, Due, {Amount, <<"RUB">>}, Context); make_invoice_params(Product, Due, {Amount, Currency}, Context) -> #payproc_InvoiceParams{ + shop_id = <<"THRIFT-SHOP">>, product = Product, amount = Amount, due = format_datetime(Due), @@ -244,9 +245,11 @@ make_payment_params() -> make_payment_params(PaymentTool, Session) -> #payproc_InvoicePaymentParams{ - payer = #domain_Payer{}, - payment_tool = PaymentTool, - session = Session + payer = #domain_Payer{ + payment_tool = PaymentTool, + session = Session, + client_info = #domain_ClientInfo{} + } }. make_payment_tool() -> diff --git a/apps/hg_client/src/hg_client.erl b/apps/hg_client/src/hg_client.erl index b9c416a..8f2a691 100644 --- a/apps/hg_client/src/hg_client.erl +++ b/apps/hg_client/src/hg_client.erl @@ -271,7 +271,7 @@ strip_event(#payproc_Event{payload = Payload}) -> Payload. issue_service_call(ServiceName, Function, Args, Client = #cl{context = Context, root_url = RootUrl}) -> - {_Name, Path, Service} = hg_proto:get_service_spec(ServiceName), + {Path, Service} = hg_proto:get_service_spec(ServiceName), Url = iolist_to_binary([RootUrl, Path]), Request = {Service, Function, Args}, {Result, ContextNext} = woody_client:call_safe(Context, Request, #{url => Url}), diff --git a/apps/hg_proto/damsel b/apps/hg_proto/damsel index d3883df..fb43ca0 160000 --- a/apps/hg_proto/damsel +++ b/apps/hg_proto/damsel @@ -1 +1 @@ -Subproject commit d3883df63982bf9bf5c8e9ac477cd65e1238a45b +Subproject commit fb43ca0f55da97cb649c7933251ab54fbb651ffe diff --git a/apps/hg_proto/src/hg_proto.erl b/apps/hg_proto/src/hg_proto.erl index ad49e77..2bca799 100644 --- a/apps/hg_proto/src/hg_proto.erl +++ b/apps/hg_proto/src/hg_proto.erl @@ -1,28 +1,31 @@ -module(hg_proto). --export([get_service_specs/0]). -export([get_service_spec/1]). +-export([get_service_spec/2]). -export_type([service_spec/0]). %% --type service_spec() :: {Name :: atom(), Path :: string(), Service :: {module(), atom()}}. +-define(VERSION_PREFIX, "/v1"). --spec get_service_specs() -> [service_spec()]. +-type service_spec() :: {Path :: string(), Service :: {module(), atom()}}. -get_service_specs() -> - VersionPrefix = "/v1", - [ - {eventsink, VersionPrefix ++ "/processing/eventsink", - {hg_payment_processing_thrift, 'EventSink'}}, - {invoicing, VersionPrefix ++ "/processing/invoicing", - {hg_payment_processing_thrift, 'Invoicing'}}, - {processor, VersionPrefix ++ "/stateproc/processor", - {hg_state_processing_thrift, 'Processor'}} - ]. - --spec get_service_spec(Name :: atom()) -> service_spec() | false. +-spec get_service_spec(Name :: atom()) -> service_spec(). get_service_spec(Name) -> - lists:keyfind(Name, 1, get_service_specs()). + get_service_spec(Name, #{}). + +-spec get_service_spec(Name :: atom(), Opts :: #{}) -> service_spec(). + +get_service_spec(eventsink, #{}) -> + Service = {hg_payment_processing_thrift, 'EventSink'}, + {?VERSION_PREFIX ++ "/processing/eventsink", Service}; + +get_service_spec(invoicing, #{}) -> + Service = {hg_payment_processing_thrift, 'Invoicing'}, + {?VERSION_PREFIX ++ "/processing/invoicing", Service}; + +get_service_spec(processor, #{namespace := Ns}) when is_binary(Ns) -> + Service = {hg_state_processing_thrift, 'Processor'}, + {?VERSION_PREFIX ++ "/stateproc/" ++ binary_to_list(Ns), Service}. diff --git a/docker-compose.sh b/docker-compose.sh index f05a5ea..8eb95fd 100755 --- a/docker-compose.sh +++ b/docker-compose.sh @@ -11,7 +11,7 @@ services: depends_on: - machinegun machinegun: - image: dr.rbkmoney.com/rbkmoney/mg_prototype:87bef0b + image: dr.rbkmoney.com/rbkmoney/mg_prototype:3455e7b command: /opt/mgun/bin/mgun foreground networks: default: