mirror of
https://github.com/valitydev/hellgate.git
synced 2024-11-06 02:45:20 +00:00
HG-51: Payments over new protocol (#24)
* HG-51: Implement new proxy protocol, shitcode at its best * HG-51: Refactor a bit * HG-51: Switch to a renewed mg protocol * HG-51: Bump to damsel upstream w/ payer contact info * HG-51: Mention pointlessly complex payment ids in TODO list * HG-51: Hardcode eventsink id instead * HG-51: Update lockfile * HG-51: Fail furiosly when callback handling ends up w/ unexpected error * HG-51: Store less state in the session start event * HG-51: Fail furiously on protocol errors, e.g. proxy contract violations * HG-51: Add more TODO
This commit is contained in:
parent
f4488f9a9e
commit
6d46d566d0
2
Makefile
2
Makefile
@ -17,7 +17,7 @@ BASE_IMAGE_NAME := service_erlang
|
||||
BASE_IMAGE_TAG := 170b7dd12d62431303f8bb514abe2b43468223a1
|
||||
|
||||
# Build image tag to be used
|
||||
BUILD_IMAGE_TAG := 753126790c9ecd763840d9fe58507335af02b875
|
||||
BUILD_IMAGE_TAG := 6fb209e428feaa0ef6cec07d3909d8a3c4013537
|
||||
|
||||
CALL_ANYWHERE := all submodules rebar-update compile xref lint dialyze start devrel release clean distclean
|
||||
|
||||
|
@ -26,9 +26,12 @@
|
||||
{invoice_payment_status_changed,
|
||||
#payproc_InvoicePaymentStatusChanged{payment_id = PaymentID, status = Status}}
|
||||
).
|
||||
-define(payment_state_changed(PaymentID),
|
||||
{invoice_payment_state_changed,
|
||||
#payproc_InvoicePaymentStateChanged{payment_id = PaymentID}}
|
||||
-define(payment_interaction_requested(PaymentID, UserInteraction),
|
||||
{invoice_payment_interaction_requested,
|
||||
#payproc_InvoicePaymentInteractionRequested{
|
||||
payment_id = PaymentID,
|
||||
interaction = UserInteraction
|
||||
}}
|
||||
).
|
||||
|
||||
-define(paid(),
|
||||
@ -42,8 +45,10 @@
|
||||
|
||||
-define(pending(),
|
||||
{pending, #domain_InvoicePaymentPending{}}).
|
||||
-define(succeeded(),
|
||||
{succeeded, #domain_InvoicePaymentSucceeded{}}).
|
||||
-define(processed(),
|
||||
{processed, #domain_InvoicePaymentProcessed{}}).
|
||||
-define(captured(),
|
||||
{captured, #domain_InvoicePaymentCaptured{}}).
|
||||
-define(failed(Error),
|
||||
{failed, #domain_InvoicePaymentFailed{err = Error}}).
|
||||
|
||||
|
@ -57,13 +57,17 @@ get_api_child_spec(MachineHandlers) ->
|
||||
net_opts => [],
|
||||
event_handler => hg_woody_event_handler,
|
||||
handlers => hg_machine:get_service_handlers(MachineHandlers) ++ [
|
||||
construct_service_handler(party_management, hg_party, []),
|
||||
construct_service_handler(invoicing, hg_invoice, []),
|
||||
construct_service_handler(eventsink, hg_event_sink, [])
|
||||
construct_service_handler(party_management, hg_party),
|
||||
construct_service_handler(invoicing, hg_invoice),
|
||||
construct_service_handler(proxy_host_provider, hg_proxy_host_provider),
|
||||
construct_service_handler(eventsink, hg_event_sink)
|
||||
]
|
||||
}
|
||||
).
|
||||
|
||||
construct_service_handler(Name, Module) ->
|
||||
construct_service_handler(Name, Module, []).
|
||||
|
||||
construct_service_handler(Name, Module, Opts) ->
|
||||
{Path, Service} = hg_proto:get_service_spec(Name),
|
||||
{Path, {Service, Module, Opts}}.
|
||||
|
@ -63,10 +63,12 @@ get_history_last_id(History, _LastID) ->
|
||||
publish_event(#'SinkEvent'{id = ID, source_ns = Ns, source_id = SourceID, event = Event}) ->
|
||||
hg_event_provider:publish_event(Ns, ID, SourceID, hg_machine:unwrap_event(Event)).
|
||||
|
||||
-define(EVENTSINK_ID, <<"payproc">>).
|
||||
|
||||
call_event_sink(Function, Args, Context) ->
|
||||
Url = genlib_app:env(hellgate, eventsink_service_url),
|
||||
Service = {hg_state_processing_thrift, 'EventSink'},
|
||||
woody_client:call(Context, {Service, Function, Args}, #{url => Url}).
|
||||
woody_client:call(Context, {Service, Function, [?EVENTSINK_ID | Args]}, #{url => Url}).
|
||||
|
||||
-spec handle_error(woody_t:func(), term(), woody_client:context(), []) ->
|
||||
_.
|
||||
|
@ -1,8 +1,24 @@
|
||||
%%% Invoice machine
|
||||
%%%
|
||||
%%% TODO
|
||||
%%% - REFACTOR WITH FIRE
|
||||
%%% - proper concepts
|
||||
%%% - simple lightweight lower-level machines (middlewares (?)) for:
|
||||
%%% - handling callbacks idempotently
|
||||
%%% - state collapsing (?)
|
||||
%%% - simpler flow control (?)
|
||||
%%% - event publishing (?)
|
||||
%%% - timer preservation on calls (?)
|
||||
%%% - do not make payment ids so complex, a sequence would suffice
|
||||
%%% - alter `Invoicing.GetPayment` signature
|
||||
|
||||
-module(hg_invoice).
|
||||
-include_lib("hg_proto/include/hg_payment_processing_thrift.hrl").
|
||||
|
||||
-define(NS, <<"invoice">>).
|
||||
|
||||
-export([process_callback/3]).
|
||||
|
||||
%% Woody handler
|
||||
|
||||
-behaviour(woody_server_thrift_handler).
|
||||
@ -29,8 +45,7 @@
|
||||
|
||||
-record(st, {
|
||||
invoice :: invoice(),
|
||||
payments = [] :: [payment()],
|
||||
stage = idling :: stage(),
|
||||
payments = [] :: [{payment_id(), payment_st()}],
|
||||
sequence = 0 :: 0 | sequence()
|
||||
}).
|
||||
|
||||
@ -58,10 +73,10 @@ handle_function('StartPayment', {UserInfo, InvoiceID, PaymentParams}, Context0,
|
||||
|
||||
handle_function('GetPayment', {UserInfo, UserInfo, PaymentID}, Context0, _Opts) ->
|
||||
{St, Context} = get_state(UserInfo, deduce_invoice_id(PaymentID), Context0),
|
||||
case get_payment(PaymentID, St) of
|
||||
Payment = #domain_InvoicePayment{} ->
|
||||
case get_payment_session(PaymentID, St) of
|
||||
{Payment = #domain_InvoicePayment{}, _} ->
|
||||
{{ok, Payment}, Context};
|
||||
false ->
|
||||
undefined ->
|
||||
throw({#payproc_InvoicePaymentNotFound{}, Context})
|
||||
end;
|
||||
|
||||
@ -73,6 +88,18 @@ handle_function('Rescind', {UserInfo, InvoiceID, Reason}, Context0, _Opts) ->
|
||||
|
||||
%%
|
||||
|
||||
-type tag() :: hg_base_thrift:'Tag'().
|
||||
-type callback() :: _. %% FIXME
|
||||
-type callback_response() :: _. %% FIXME
|
||||
|
||||
-spec process_callback(tag(), callback(), woody_client:context()) ->
|
||||
{{ok, callback_response()} | {error, notfound | failed}, woody_client:context()} | no_return().
|
||||
|
||||
process_callback(Tag, Callback, Context) ->
|
||||
hg_machine:call(?NS, {tag, Tag}, {callback, Callback}, opts(Context)).
|
||||
|
||||
%%
|
||||
|
||||
get_history(_UserInfo, InvoiceID, Context) ->
|
||||
map_error(hg_machine:get_history(?NS, InvoiceID, opts(Context))).
|
||||
|
||||
@ -86,21 +113,24 @@ get_state(UserInfo, InvoiceID, Context0) ->
|
||||
get_public_history(UserInfo, InvoiceID, #payproc_EventRange{'after' = AfterID, limit = Limit}, Context) ->
|
||||
hg_history:get_public_history(
|
||||
fun (ID, Lim, Ctx) -> get_history(UserInfo, InvoiceID, ID, Lim, Ctx) end,
|
||||
fun (Event) -> publish_invoice_event({invoice, InvoiceID}, Event) end,
|
||||
fun (Event) -> publish_invoice_event(InvoiceID, Event) end,
|
||||
AfterID, Limit,
|
||||
Context
|
||||
).
|
||||
|
||||
publish_invoice_event(Source, {ID, Dt, {public, Seq, Payload}}) ->
|
||||
{true, #payproc_Event{id = ID, source = Source, created_at = Dt, sequence = Seq, payload = Payload}};
|
||||
publish_invoice_event(_Source, {_ID, _Dt, _Event}) ->
|
||||
false.
|
||||
publish_invoice_event(InvoiceID, {ID, Dt, Event}) ->
|
||||
case publish_event(InvoiceID, Event) of
|
||||
{true, {Source, Seq, Ev}} ->
|
||||
{true, #payproc_Event{id = ID, source = Source, created_at = Dt, sequence = Seq, payload = Ev}};
|
||||
false ->
|
||||
false
|
||||
end.
|
||||
|
||||
start(ID, Args, Context) ->
|
||||
map_error(hg_machine:start(?NS, ID, Args, opts(Context))).
|
||||
|
||||
call(ID, Args, Context) ->
|
||||
map_error(hg_machine:call(?NS, ID, Args, opts(Context))).
|
||||
map_error(hg_machine:call(?NS, {id, ID}, Args, opts(Context))).
|
||||
|
||||
map_error({{error, notfound}, Context}) ->
|
||||
throw({#payproc_UserInvoiceNotFound{}, Context});
|
||||
@ -118,35 +148,32 @@ opts(Context) ->
|
||||
-type invoice_id() :: hg_domain_thrift:'InvoiceID'().
|
||||
-type user_info() :: hg_payment_processing_thrift:'UserInfo'().
|
||||
-type invoice_params() :: hg_payment_processing_thrift:'InvoiceParams'().
|
||||
-type payment() :: hg_domain_thrift:'InvoicePayment'().
|
||||
-type payment_params() :: hg_payment_processing_thrift:'InvoicePaymentParams'().
|
||||
-type payment_id() :: hg_domain_thrift:'InvoicePaymentID'().
|
||||
-type payment_st() :: undefined | binary().
|
||||
-type payment_st() :: hg_invoice_payment:st().
|
||||
-type sequence() :: pos_integer().
|
||||
|
||||
-type stage() ::
|
||||
idling |
|
||||
{processing_payment, payment_id(), payment_st()}.
|
||||
|
||||
-type ev() ::
|
||||
{public, sequence(), hg_payment_processing_thrift:'EventPayload'()} |
|
||||
{private, sequence(), private_event()}.
|
||||
|
||||
-type private_event() ::
|
||||
{payment_state_changed, payment_id(), payment_st()}.
|
||||
term(). %% TODO hg_invoice_payment:private_event() ?
|
||||
|
||||
-include("invoice_events.hrl").
|
||||
|
||||
-define(invalid_invoice_status(Invoice),
|
||||
#payproc_InvalidInvoiceStatus{status = Invoice#domain_Invoice.status}).
|
||||
-define(invalid_invoice_status(Status),
|
||||
#payproc_InvalidInvoiceStatus{status = Status}).
|
||||
-define(payment_pending(PaymentID),
|
||||
#payproc_InvoicePaymentPending{id = PaymentID}).
|
||||
|
||||
-spec publish_event(invoice_id(), hg_machine:event(ev())) ->
|
||||
{true, hg_event_provider:public_event()} | false.
|
||||
|
||||
publish_event(InvoiceID, {public, Seq, Ev}) ->
|
||||
publish_event(InvoiceID, {public, Seq, Ev = ?invoice_ev(_)}) ->
|
||||
{true, {{invoice, InvoiceID}, Seq, Ev}};
|
||||
publish_event(InvoiceID, {public, Seq, {{payment, _}, Ev = ?payment_ev(_)}}) ->
|
||||
{true, {{invoice, InvoiceID}, Seq, ?invoice_ev(Ev)}};
|
||||
publish_event(_InvoiceID, _Event) ->
|
||||
false.
|
||||
|
||||
@ -164,68 +191,41 @@ namespace() ->
|
||||
init(ID, {InvoiceParams, UserInfo}, Context) ->
|
||||
Invoice = create_invoice(ID, InvoiceParams, UserInfo),
|
||||
Event = {public, ?invoice_ev(?invoice_created(Invoice))},
|
||||
{ok(Event, #st{}, set_invoice_timer(Invoice)), Context}.
|
||||
% TODO ugly, better to roll state and events simultaneously, hg_party-like
|
||||
{ok(Event, #st{}, set_invoice_timer(#st{invoice = Invoice})), Context}.
|
||||
|
||||
%%
|
||||
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:history(ev()), hg_machine:context()) ->
|
||||
{hg_machine:result(ev()), woody_client:context()}.
|
||||
|
||||
process_signal(timeout, History, Context) ->
|
||||
St = #st{invoice = Invoice, stage = Stage} = collapse_history(History),
|
||||
Status = get_invoice_status(Invoice),
|
||||
case Stage of
|
||||
{processing_payment, PaymentID, PaymentState} ->
|
||||
process_signal(Signal, History, Context) ->
|
||||
handle_signal(Signal, collapse_history(History), Context).
|
||||
|
||||
handle_signal(timeout, St, Context) ->
|
||||
case get_pending_payment(St) of
|
||||
{PaymentID, PaymentSession} ->
|
||||
% there's a payment pending
|
||||
process_payment(PaymentID, PaymentState, St, Context);
|
||||
idling when Status == unpaid ->
|
||||
process_payment_signal(timeout, PaymentID, PaymentSession, St, Context);
|
||||
undefined ->
|
||||
% invoice is expired
|
||||
process_expiration(St, Context);
|
||||
_ ->
|
||||
{ok(), Context}
|
||||
handle_expiration(St, Context)
|
||||
end;
|
||||
|
||||
process_signal({repair, _}, History, Context) ->
|
||||
St = #st{invoice = Invoice} = collapse_history(History),
|
||||
{ok([], St, set_invoice_timer(Invoice)), Context}.
|
||||
handle_signal({repair, _}, St, Context) ->
|
||||
{ok([], St, restore_timer(St)), Context}.
|
||||
|
||||
process_expiration(St = #st{invoice = Invoice}, Context) ->
|
||||
{ok, Event} = cancel_invoice(overdue, Invoice),
|
||||
handle_expiration(St, Context) ->
|
||||
Event = {public, ?invoice_ev(?invoice_status_changed(?cancelled(format_reason(overdue))))},
|
||||
{ok(Event, St), Context}.
|
||||
|
||||
process_payment(PaymentID, PaymentState0, St = #st{invoice = Invoice}, Context0) ->
|
||||
% FIXME: code looks shitty, destined to be in payment submachine
|
||||
Payment = get_payment(PaymentID, St),
|
||||
case hg_invoice_payment:process(Payment, Invoice, PaymentState0, Context0) of
|
||||
% TODO: check proxy contracts
|
||||
% binding different trx ids is not allowed
|
||||
% empty action is questionable to allow
|
||||
{{ok, Trx}, Context} ->
|
||||
% payment finished successfully
|
||||
Events = [
|
||||
{public, ?invoice_ev(?payment_ev(?payment_status_changed(PaymentID, ?succeeded())))},
|
||||
{public, ?invoice_ev(?invoice_status_changed(?paid()))}
|
||||
],
|
||||
{ok(construct_payment_events(Payment, Trx, Events), St), Context};
|
||||
{{{error, Error = #domain_OperationError{}}, Trx}, Context} ->
|
||||
% payment finished with error
|
||||
Event = {public, ?invoice_ev(?payment_ev(?payment_status_changed(PaymentID, ?failed(Error))))},
|
||||
{ok(construct_payment_events(Payment, Trx, [Event]), St), Context};
|
||||
{{{next, Action, PaymentState}, Trx}, Context} ->
|
||||
% payment progressing yet
|
||||
Event = {private, {payment_state_changed, PaymentID, PaymentState}},
|
||||
{ok(construct_payment_events(Payment, Trx, [Event]), St, Action), Context}
|
||||
end.
|
||||
|
||||
construct_payment_events(#domain_InvoicePayment{trx = Trx}, #domain_TransactionInfo{} = Trx, Events) ->
|
||||
Events;
|
||||
construct_payment_events(#domain_InvoicePayment{} = Payment, #domain_TransactionInfo{} = Trx, Events) ->
|
||||
[{public, ?invoice_ev(?payment_ev(?payment_bound(get_payment_id(Payment), Trx)))} | Events];
|
||||
construct_payment_events(#domain_InvoicePayment{trx = Trx}, Trx = undefined, Events) ->
|
||||
Events.
|
||||
%%
|
||||
|
||||
-type call() ::
|
||||
{start_payment, payment_params(), user_info()} |
|
||||
{fulfill, binary(), user_info()} |
|
||||
{rescind, binary(), user_info()}.
|
||||
{rescind, binary(), user_info()} |
|
||||
{callback, callback()}.
|
||||
|
||||
-type response() ::
|
||||
ok | {ok, term()} | {exception, term()}.
|
||||
@ -233,64 +233,151 @@ construct_payment_events(#domain_InvoicePayment{trx = Trx}, Trx = undefined, Eve
|
||||
-spec process_call(call(), hg_machine:history(ev()), woody_client:context()) ->
|
||||
{{response(), hg_machine:result(ev())}, woody_client:context()}.
|
||||
|
||||
process_call({start_payment, PaymentParams, _UserInfo}, History, Context) ->
|
||||
St = #st{invoice = Invoice, stage = Stage} = collapse_history(History),
|
||||
Status = get_invoice_status(Invoice),
|
||||
case Stage of
|
||||
idling when Status == unpaid ->
|
||||
Payment = create_payment(PaymentParams, Invoice),
|
||||
PaymentID = get_payment_id(Payment),
|
||||
Events = [
|
||||
{public, ?invoice_ev(?payment_ev(?payment_started(Payment)))},
|
||||
{private, {payment_state_changed, PaymentID, undefined}}
|
||||
],
|
||||
{respond(PaymentID, Events, St, hg_machine_action:instant()), Context};
|
||||
{processing_payment, PaymentID, _} ->
|
||||
{raise(?payment_pending(PaymentID)), Context};
|
||||
_ ->
|
||||
{raise(?invalid_invoice_status(Invoice)), Context}
|
||||
end;
|
||||
|
||||
process_call({fulfill, Reason, _UserInfo}, History, Context) ->
|
||||
St = #st{invoice = Invoice} = collapse_history(History),
|
||||
case fulfill_invoice(Reason, Invoice) of
|
||||
{ok, Event} ->
|
||||
{respond(ok, Event, St, set_invoice_timer(Invoice)), Context};
|
||||
{error, Exception} ->
|
||||
{raise(Exception, set_invoice_timer(Invoice)), Context}
|
||||
end;
|
||||
|
||||
process_call({rescind, Reason, _UserInfo}, History, Context) ->
|
||||
St = #st{invoice = Invoice} = collapse_history(History),
|
||||
case cancel_invoice({rescinded, Reason}, Invoice) of
|
||||
{ok, Event} ->
|
||||
{respond(ok, Event, St, set_invoice_timer(Invoice)), Context};
|
||||
{error, Exception} ->
|
||||
{raise(Exception, set_invoice_timer(Invoice)), Context}
|
||||
process_call(Call, History, Context) ->
|
||||
St = collapse_history(History),
|
||||
try handle_call(Call, St, Context) catch
|
||||
{exception, Exception} ->
|
||||
{{{exception, Exception}, {[], restore_timer(St)}}, Context}
|
||||
end.
|
||||
|
||||
set_invoice_timer(#domain_Invoice{status = ?unpaid(), due = Due}) when Due /= undefined ->
|
||||
hg_machine_action:set_deadline(Due);
|
||||
set_invoice_timer(_Invoice) ->
|
||||
hg_machine_action:new().
|
||||
-spec raise(term()) -> no_return().
|
||||
|
||||
raise(What) ->
|
||||
throw({exception, What}).
|
||||
|
||||
handle_call({start_payment, PaymentParams, _UserInfo}, St, Context) ->
|
||||
_ = assert_invoice_status(unpaid, St),
|
||||
_ = assert_no_pending_payment(St),
|
||||
start_payment(PaymentParams, St, Context);
|
||||
|
||||
handle_call({fulfill, Reason, _UserInfo}, St, Context) ->
|
||||
_ = assert_invoice_status(paid, St),
|
||||
Event = {public, ?invoice_ev(?invoice_status_changed(?fulfilled(format_reason(Reason))))},
|
||||
{respond(ok, Event, St), Context};
|
||||
|
||||
handle_call({rescind, Reason, _UserInfo}, St, Context) ->
|
||||
_ = assert_invoice_status(unpaid, St),
|
||||
_ = assert_no_pending_payment(St),
|
||||
Event = {public, ?invoice_ev(?invoice_status_changed(?cancelled(format_reason(Reason))))},
|
||||
{respond(ok, Event, St), Context};
|
||||
|
||||
handle_call({callback, Callback}, St, Context) ->
|
||||
dispatch_callback(Callback, St, Context).
|
||||
|
||||
dispatch_callback({provider, Payload}, St, Context) ->
|
||||
case get_pending_payment(St) of
|
||||
{PaymentID, PaymentSession} ->
|
||||
process_payment_call({callback, Payload}, PaymentID, PaymentSession, St, Context);
|
||||
undefined ->
|
||||
raise(no_pending_payment) % FIXME
|
||||
end.
|
||||
|
||||
assert_invoice_status(Status, #st{invoice = Invoice}) ->
|
||||
assert_invoice_status(Status, Invoice);
|
||||
assert_invoice_status(Status, #domain_Invoice{status = {Status, _}}) ->
|
||||
ok;
|
||||
assert_invoice_status(_Status, #domain_Invoice{status = Invalid}) ->
|
||||
raise(?invalid_invoice_status(Invalid)).
|
||||
|
||||
assert_no_pending_payment(St) ->
|
||||
case get_pending_payment(St) of
|
||||
undefined ->
|
||||
ok;
|
||||
{PaymentID, _} ->
|
||||
raise(?payment_pending(PaymentID))
|
||||
end.
|
||||
|
||||
restore_timer(St) ->
|
||||
set_invoice_timer(St).
|
||||
|
||||
set_invoice_timer(St = #st{invoice = #domain_Invoice{status = Status, due = Due}}) ->
|
||||
case get_pending_payment(St) of
|
||||
undefined when Status == ?unpaid() ->
|
||||
hg_machine_action:set_deadline(Due);
|
||||
undefined ->
|
||||
hg_machine_action:new();
|
||||
{_, _} ->
|
||||
% TODO how to restore timer properly then, magic number for now
|
||||
hg_machine_action:set_timeout(10)
|
||||
end.
|
||||
|
||||
%%
|
||||
|
||||
start_payment(PaymentParams, St, Context) ->
|
||||
PaymentID = create_payment_id(St),
|
||||
Opts = get_payment_opts(St),
|
||||
{{Events1, _}, Context1} = hg_invoice_payment:init(PaymentID, PaymentParams, Opts, Context),
|
||||
{{Events2, Action}, Context2} = hg_invoice_payment:start_session(?processed(), Context1),
|
||||
Events = wrap_payment_events(PaymentID, Events1 ++ Events2),
|
||||
{respond(PaymentID, Events, St, Action), Context2}.
|
||||
|
||||
process_payment_signal(Signal, PaymentID, PaymentSession, St = #st{invoice = Invoice}, Context) ->
|
||||
Opts = get_payment_opts(St),
|
||||
case hg_invoice_payment:process_signal(Signal, PaymentSession, Opts, Context) of
|
||||
{{next, {Events, Action}}, Context1} ->
|
||||
{ok(wrap_payment_events(PaymentID, Events), St, Action), Context1};
|
||||
{{done, {Events1, _}}, Context1} ->
|
||||
{Payment1, _} = lists:foldl(fun hg_invoice_payment:merge_event/2, PaymentSession, Events1),
|
||||
case get_payment_status(Payment1) of
|
||||
?processed() ->
|
||||
{{Events2, Action}, Context2} = hg_invoice_payment:start_session(?captured(), Context1),
|
||||
{ok(wrap_payment_events(PaymentID, Events1 ++ Events2), St, Action), Context2};
|
||||
?captured() ->
|
||||
Events2 = [{public, ?invoice_ev(?invoice_status_changed(?paid()))}],
|
||||
{ok(wrap_payment_events(PaymentID, Events1) ++ Events2, St), Context1};
|
||||
?failed(_) ->
|
||||
{ok(wrap_payment_events(PaymentID, Events1), St, restore_timer(St)), Context1}
|
||||
end
|
||||
end.
|
||||
|
||||
process_payment_call(Call, PaymentID, PaymentSession, St = #st{invoice = Invoice}, Context) ->
|
||||
Opts = get_payment_opts(St),
|
||||
case hg_invoice_payment:process_call(Call, PaymentSession, Opts, Context) of
|
||||
{{Response, {next, {Events, Action}}}, Context1} ->
|
||||
{respond(Response, wrap_payment_events(PaymentID, Events), St, Action), Context1};
|
||||
{{Response, {done, {Events1, _}}}, Context1} ->
|
||||
{Payment1, _} = lists:foldl(fun hg_invoice_payment:merge_event/2, PaymentSession, Events1),
|
||||
case get_payment_status(Payment1) of
|
||||
?processed() ->
|
||||
{{Events2, Action}, Context2} = hg_invoice_payment:start_session(?captured(), Context1),
|
||||
Events = wrap_payment_events(PaymentID, Events1 ++ Events2),
|
||||
{respond(Response, Events, St, Action), Context2};
|
||||
?captured() ->
|
||||
Events2 = [{public, ?invoice_ev(?invoice_status_changed(?paid()))}],
|
||||
{respond(Response, wrap_payment_events(PaymentID, Events1) ++ Events2, St), Context1};
|
||||
?failed(_) ->
|
||||
{respond(Response, wrap_payment_events(PaymentID, Events1), St, restore_timer(St)), Context1}
|
||||
end
|
||||
end.
|
||||
|
||||
wrap_payment_events(PaymentID, Events) ->
|
||||
lists:map(fun
|
||||
(E = ?payment_ev(_)) ->
|
||||
{public, {{payment, PaymentID}, E}};
|
||||
(E) ->
|
||||
{private, {{payment, PaymentID}, E}}
|
||||
end, Events).
|
||||
|
||||
get_payment_opts(#st{invoice = Invoice = #domain_Invoice{domain_revision = Revision}}) ->
|
||||
#{
|
||||
invoice => Invoice,
|
||||
proxy => hg_domain:get(Revision, #domain_ProxyRef{id = 1})
|
||||
}.
|
||||
|
||||
%%
|
||||
|
||||
ok() ->
|
||||
{[], hg_machine_action:new()}.
|
||||
ok(Event, St) ->
|
||||
ok(Event, St, hg_machine_action:new()).
|
||||
ok(Event, St, Action) ->
|
||||
{sequence_events(wrap_event_list(Event), St), Action}.
|
||||
|
||||
respond(Response, Event, St) ->
|
||||
respond(Response, Event, St, hg_machine_action:new()).
|
||||
respond(Response, Event, St, Action) ->
|
||||
{{ok, Response}, {sequence_events(wrap_event_list(Event), St), Action}}.
|
||||
|
||||
raise(Exception) ->
|
||||
raise(Exception, hg_machine_action:new()).
|
||||
raise(Exception, Action) ->
|
||||
{{exception, Exception}, {[], Action}}.
|
||||
|
||||
wrap_event_list(Event) when is_tuple(Event) ->
|
||||
wrap_event_list([Event]);
|
||||
[Event];
|
||||
wrap_event_list(Events) when is_list(Events) ->
|
||||
Events.
|
||||
|
||||
@ -324,19 +411,10 @@ create_invoice(ID, V = #payproc_InvoiceParams{}, #payproc_UserInfo{id = UserID})
|
||||
}
|
||||
}.
|
||||
|
||||
create_payment(V = #payproc_InvoicePaymentParams{}, Invoice = #domain_Invoice{cost = Cost}) ->
|
||||
#domain_InvoicePayment{
|
||||
id = create_payment_id(Invoice),
|
||||
created_at = hg_datetime:format_now(),
|
||||
status = ?pending(),
|
||||
cost = Cost,
|
||||
payer = V#payproc_InvoicePaymentParams.payer
|
||||
}.
|
||||
|
||||
create_payment_id(Invoice = #domain_Invoice{}) ->
|
||||
create_payment_id(get_invoice_id(Invoice));
|
||||
create_payment_id(InvoiceID) ->
|
||||
<<InvoiceID/binary, ":", "0">>.
|
||||
create_payment_id(#st{invoice = Invoice, payments = Payments}) ->
|
||||
InvoiceID = get_invoice_id(Invoice),
|
||||
PaymentID = integer_to_binary(length(Payments) + 1),
|
||||
<<InvoiceID/binary, ":", PaymentID/binary>>.
|
||||
|
||||
deduce_invoice_id(PaymentID) ->
|
||||
case binary:split(PaymentID, <<":">>) of
|
||||
@ -349,61 +427,53 @@ deduce_invoice_id(PaymentID) ->
|
||||
get_invoice_id(#domain_Invoice{id = ID}) ->
|
||||
ID.
|
||||
|
||||
get_invoice_status(#domain_Invoice{status = {Status, _}}) ->
|
||||
get_payment_status(#domain_InvoicePayment{status = Status}) ->
|
||||
Status.
|
||||
|
||||
get_payment_id(#domain_InvoicePayment{id = ID}) ->
|
||||
ID.
|
||||
|
||||
cancel_invoice(Reason, #domain_Invoice{status = ?unpaid()}) ->
|
||||
{ok, {public, ?invoice_ev(?invoice_status_changed(?cancelled(format_reason(Reason))))}};
|
||||
cancel_invoice(_Reason, Invoice) ->
|
||||
{error, ?invalid_invoice_status(Invoice)}.
|
||||
|
||||
fulfill_invoice(Reason, #domain_Invoice{status = ?paid()}) ->
|
||||
{ok, {public, ?invoice_ev(?invoice_status_changed(?fulfilled(format_reason(Reason))))}};
|
||||
fulfill_invoice(_Reason, Invoice) ->
|
||||
{error, ?invalid_invoice_status(Invoice)}.
|
||||
|
||||
%%
|
||||
|
||||
-spec collapse_history([ev()]) -> st().
|
||||
|
||||
collapse_history(History) ->
|
||||
lists:foldl(fun ({_ID, _, Ev}, St) -> merge_history(Ev, St) end, #st{}, History).
|
||||
lists:foldl(
|
||||
fun ({_ID, _, {_, Seq, Ev}}, St) -> merge_event(Ev, St#st{sequence = Seq}) end,
|
||||
#st{},
|
||||
History
|
||||
).
|
||||
|
||||
merge_history({public, Seq, ?invoice_ev(Event)}, St) ->
|
||||
merge_invoice_event(Event, St#st{sequence = Seq});
|
||||
merge_history({private, Seq, Event}, St) ->
|
||||
merge_private_event(Event, St#st{sequence = Seq}).
|
||||
merge_event(?invoice_ev(Event), St) ->
|
||||
merge_invoice_event(Event, St);
|
||||
merge_event({{payment, PaymentID}, Event}, St) ->
|
||||
PaymentSession = get_payment_session(PaymentID, St),
|
||||
set_payment_session(PaymentID, hg_invoice_payment:merge_event(Event, PaymentSession), St).
|
||||
|
||||
merge_invoice_event(?invoice_created(Invoice), St) ->
|
||||
St#st{invoice = Invoice};
|
||||
merge_invoice_event(?invoice_status_changed(Status), St = #st{invoice = I}) ->
|
||||
St#st{invoice = I#domain_Invoice{status = Status}};
|
||||
merge_invoice_event(?payment_ev(Event), St) ->
|
||||
merge_payment_event(Event, St).
|
||||
St#st{invoice = I#domain_Invoice{status = Status}}.
|
||||
|
||||
merge_payment_event(?payment_started(Payment), St) ->
|
||||
set_payment(Payment, St);
|
||||
merge_payment_event(?payment_bound(PaymentID, Trx), St) ->
|
||||
Payment = get_payment(PaymentID, St),
|
||||
set_payment(Payment#domain_InvoicePayment{trx = Trx}, St);
|
||||
merge_payment_event(?payment_status_changed(PaymentID, Status), St) ->
|
||||
Payment = get_payment(PaymentID, St),
|
||||
set_payment(Payment#domain_InvoicePayment{status = Status}, set_stage(idling, St)).
|
||||
get_payment_session(PaymentID, #st{payments = Payments}) ->
|
||||
case lists:keyfind(PaymentID, 1, Payments) of
|
||||
{PaymentID, PaymentSession} ->
|
||||
PaymentSession;
|
||||
false ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
merge_private_event({payment_state_changed, PaymentID, State}, St) ->
|
||||
set_stage({processing_payment, PaymentID, State}, St).
|
||||
set_payment_session(PaymentID, PaymentSession, St = #st{payments = Payments}) ->
|
||||
St#st{payments = lists:keystore(PaymentID, 1, Payments, {PaymentID, PaymentSession})}.
|
||||
|
||||
set_stage(Stage, St) ->
|
||||
St#st{stage = Stage}.
|
||||
|
||||
get_payment(PaymentID, St) ->
|
||||
lists:keyfind(PaymentID, #domain_InvoicePayment.id, St#st.payments).
|
||||
set_payment(Payment, St) ->
|
||||
PaymentID = get_payment_id(Payment),
|
||||
St#st{payments = lists:keystore(PaymentID, #domain_InvoicePayment.id, St#st.payments, Payment)}.
|
||||
get_pending_payment(#st{payments = [V = {_PaymentID, {Payment, _}} | _]}) ->
|
||||
case get_payment_status(Payment) of
|
||||
?pending() ->
|
||||
V;
|
||||
?processed() ->
|
||||
V;
|
||||
_ ->
|
||||
undefined
|
||||
end;
|
||||
get_pending_payment(#st{}) ->
|
||||
undefined.
|
||||
|
||||
get_invoice_state(#st{invoice = Invoice, payments = Payments}) ->
|
||||
#payproc_InvoiceState{invoice = Invoice, payments = Payments}.
|
||||
|
@ -1,142 +1,333 @@
|
||||
%%% Invoice payment submachine
|
||||
%%%
|
||||
%%% TODO
|
||||
%%% - make proper submachine interface
|
||||
%%% - `init` / `start_session` should provide `next` or `done` to the caller
|
||||
%%% - distinguish between different error classes:
|
||||
%%% - regular operation error
|
||||
%%% - callback timeout
|
||||
%%% - internal error ?
|
||||
%%% - handle idempotent callbacks uniformly
|
||||
%%% - get rid of matches against session status
|
||||
%%% - tag machine with the provider trx
|
||||
%%% - distinguish between trx tags and callback tags
|
||||
%%% - tag namespaces
|
||||
%%% - clean the mess with error handling
|
||||
%%% - abuse transient error passthrough
|
||||
%%% - remove ability to throw `TryLater` from `HandlePaymentCallback`
|
||||
%%% - drop `TryLater` completely (?)
|
||||
%%% - think about safe clamping of timers returned by some proxy
|
||||
|
||||
-module(hg_invoice_payment).
|
||||
-include_lib("hg_proto/include/hg_domain_thrift.hrl").
|
||||
-include_lib("hg_proto/include/hg_proxy_provider_thrift.hrl").
|
||||
-include_lib("hg_proto/include/hg_payment_processing_thrift.hrl").
|
||||
|
||||
%% API
|
||||
|
||||
-export([process/4]).
|
||||
%% Machine like
|
||||
|
||||
%% Machine callbacks
|
||||
-export([init/4]).
|
||||
-export([start_session/2]).
|
||||
|
||||
% -behaviour(hg_machine).
|
||||
-export([process_signal/4]).
|
||||
-export([process_call/4]).
|
||||
|
||||
% -export([init/2]).
|
||||
% -export([process_signal/2]).
|
||||
% -export([process_call/2]).
|
||||
-export([merge_event/2]).
|
||||
|
||||
%%
|
||||
|
||||
-record(st, {
|
||||
stage :: process_payment | capture_payment,
|
||||
proxy_ref :: hg_domain_thrift:'ProxyRef'(),
|
||||
proxy_state :: binary() | undefined
|
||||
}).
|
||||
|
||||
-type st() :: #st{}.
|
||||
-type st() :: {payment(), session()}.
|
||||
-export_type([st/0]).
|
||||
|
||||
-type invoice() :: hg_domain_thrift:'Invoice'().
|
||||
-type payment() :: hg_domain_thrift:'InvoicePayment'().
|
||||
-type payment_trx() :: hg_domain_thrift:'TransactionInfo'() | undefined.
|
||||
-type error() :: hg_domain_thrift:'OperationError'().
|
||||
-type payment_id() :: hg_domain_thrift:'InvoicePaymentID'().
|
||||
-type target() :: hg_proxy_provider_thrift:'Target'().
|
||||
-type proxy_state() :: hg_proxy_thrift:'ProxyState'().
|
||||
|
||||
-spec process(payment(), invoice(), binary() | undefined, hg_machine:context()) ->
|
||||
{{ok, payment_trx()}, hg_machine:context()} |
|
||||
{{{error, error()}, payment_trx()}, hg_machine:context()} |
|
||||
{{{next, hg_machine_action:t(), binary()}, payment_trx()}, hg_machine:context()}.
|
||||
-type session() :: #{
|
||||
target => target(),
|
||||
status => active | suspended,
|
||||
proxy_state => proxy_state() | undefined,
|
||||
retry => genlib_retry:strategy()
|
||||
}.
|
||||
|
||||
process(Payment, Invoice, undefined, Context) ->
|
||||
process_(Payment, Invoice, construct_state(Payment, Invoice), Context);
|
||||
process(Payment, Invoice, St, Context) when is_binary(St) ->
|
||||
process_(Payment, Invoice, unmarshal_st(St), Context).
|
||||
%%
|
||||
|
||||
process_(Payment, Invoice, St = #st{}, Context) ->
|
||||
Proxy = get_proxy(Invoice, St),
|
||||
PaymentInfo = construct_payment_info(Payment, Invoice, Proxy, St),
|
||||
handle_state(Proxy, PaymentInfo, St, Context).
|
||||
-include("invoice_events.hrl").
|
||||
|
||||
handle_state(Proxy, PaymentInfo, St = #st{stage = process_payment}, Context0) ->
|
||||
% FIXME: dirty simulation of one-phase payment through the two-phase interaction
|
||||
case handle_process_result(process_payment(Proxy, PaymentInfo, Context0), St) of
|
||||
{{ok, Trx}, Context} ->
|
||||
NextSt = St#st{stage = capture_payment, proxy_state = undefined},
|
||||
{{{next, hg_machine_action:instant(), marshal_st(NextSt)}, Trx}, Context};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
handle_state(Proxy, PaymentInfo, St = #st{stage = capture_payment}, Context) ->
|
||||
handle_process_result(capture_payment(Proxy, PaymentInfo, Context), St).
|
||||
-type ev() ::
|
||||
{invoice_payment_event, hg_payment_processing_thrift:'InvoicePaymentEvent'()} |
|
||||
{session_event, session_ev()}.
|
||||
|
||||
handle_process_result({#'ProcessResult'{intent = {_, Intent}, trx = Trx, next_state = ProxyStateNext}, Context}, St) ->
|
||||
handle_process_result(Intent, Trx, St#st{proxy_state = ProxyStateNext}, Context).
|
||||
-type session_ev() ::
|
||||
{started, session()} |
|
||||
{proxy_state_changed, proxy_state()} |
|
||||
{proxy_retry_changed, genlib_retry:strategy()} |
|
||||
suspended |
|
||||
activated.
|
||||
|
||||
handle_process_result(#'FinishIntent'{status = {ok, _}}, Trx, _St, Context) ->
|
||||
{{ok, Trx}, Context};
|
||||
handle_process_result(#'FinishIntent'{status = {failure, Error}}, Trx, _St, Context) ->
|
||||
{{{error, map_error(Error)}, Trx}, Context};
|
||||
handle_process_result(#'SleepIntent'{timer = Timer}, Trx, StNext, Context) ->
|
||||
{{{next, hg_machine_action:set_timer(Timer), marshal_st(StNext)}, Trx}, Context}.
|
||||
-define(session_ev(E), {session_event, E}).
|
||||
|
||||
get_proxy(#domain_Invoice{domain_revision = Revision}, #st{proxy_ref = Ref}) ->
|
||||
hg_domain:get(Revision, Ref).
|
||||
%%
|
||||
|
||||
construct_payment_info(Payment, Invoice, Proxy, #st{proxy_state = ProxyState}) ->
|
||||
#'PaymentInfo'{
|
||||
invoice = Invoice,
|
||||
payment = Payment,
|
||||
options = Proxy#domain_Proxy.options,
|
||||
state = ProxyState
|
||||
-type opts() :: #{
|
||||
invoice => invoice(),
|
||||
proxy => _
|
||||
%% TODO
|
||||
}.
|
||||
|
||||
-spec init(payment_id(), _, opts(), hg_machine:context()) ->
|
||||
{hg_machine:result(), hg_machine:context()}.
|
||||
|
||||
init(PaymentID, PaymentParams, #{invoice := Invoice}, Context) ->
|
||||
Payment = #domain_InvoicePayment{
|
||||
id = PaymentID,
|
||||
created_at = hg_datetime:format_now(),
|
||||
status = ?pending(),
|
||||
cost = Invoice#domain_Invoice.cost,
|
||||
payer = PaymentParams#payproc_InvoicePaymentParams.payer
|
||||
},
|
||||
Events = [?payment_ev(?payment_started(Payment))],
|
||||
Action = hg_machine_action:new(),
|
||||
{{Events, Action}, Context}.
|
||||
|
||||
-spec start_session(target(), hg_machine:context()) ->
|
||||
{hg_machine:result(), hg_machine:context()}.
|
||||
|
||||
start_session(Target, Context) ->
|
||||
Events = [?session_ev({started, Target})],
|
||||
Action = hg_machine_action:instant(),
|
||||
{{Events, Action}, Context}.
|
||||
|
||||
%%
|
||||
|
||||
-spec process_signal(timeout, st(), opts(), hg_machine:context()) ->
|
||||
{{next | done, hg_machine:result()}, hg_machine:context()}.
|
||||
|
||||
process_signal(timeout, St, Options, Context) ->
|
||||
case get_status(St) of
|
||||
active ->
|
||||
process(St, Options, Context);
|
||||
suspended ->
|
||||
{fail(construct_error(<<"provider_timeout">>), St), Context}
|
||||
end.
|
||||
|
||||
-spec process_call({callback, _}, st(), opts(), hg_machine:context()) ->
|
||||
{{_, {next | done, hg_machine:result()}}, hg_machine:context()}. % FIXME
|
||||
|
||||
process_call({callback, Payload}, St, Options, Context) ->
|
||||
case get_status(St) of
|
||||
suspended ->
|
||||
handle_callback(Payload, St, Options, Context);
|
||||
active ->
|
||||
% there's ultimately no way how we could end up here
|
||||
error(invalid_session_status)
|
||||
end.
|
||||
|
||||
process(St, Options, Context) ->
|
||||
ProxyContext = construct_proxy_context(St, Options),
|
||||
handle_process_result(issue_process_call(ProxyContext, Options, Context), St).
|
||||
|
||||
handle_process_result({Result, Context}, St) ->
|
||||
case Result of
|
||||
{ok, ProxyResult} ->
|
||||
{handle_proxy_result(ProxyResult, St), Context};
|
||||
{exception, Exception} ->
|
||||
{handle_exception(Exception, St), Context};
|
||||
{error, Error} ->
|
||||
error(Error)
|
||||
end.
|
||||
|
||||
handle_callback(Payload, St, Options, Context) ->
|
||||
ProxyContext = construct_proxy_context(St, Options),
|
||||
handle_callback_result(issue_callback_call(Payload, ProxyContext, Options, Context), St).
|
||||
|
||||
handle_callback_result({Result, Context}, St) ->
|
||||
case Result of
|
||||
{ok, #'CallbackResult'{result = ProxyResult, response = Response}} ->
|
||||
{What, {Events, Action}} = handle_proxy_result(ProxyResult, St),
|
||||
{{Response, {What, {[?session_ev(activated) | Events], Action}}}, Context};
|
||||
{error, _} = Error ->
|
||||
error({Error, Context})
|
||||
end.
|
||||
|
||||
handle_proxy_result(#'ProxyResult'{intent = {_, Intent}, trx = Trx, next_state = ProxyState}, St) ->
|
||||
Events1 = bind_transaction(Trx, St),
|
||||
{What, {Events2, Action}} = handle_proxy_intent(Intent, ProxyState, St),
|
||||
{What, {Events1 ++ Events2, Action}}.
|
||||
|
||||
bind_transaction(undefined, _St) ->
|
||||
% no transaction yet
|
||||
[];
|
||||
bind_transaction(Trx, {#domain_InvoicePayment{id = PaymentID, trx = undefined}, _}) ->
|
||||
% got transaction, nothing bound so far
|
||||
[?payment_ev(?payment_bound(PaymentID, Trx))];
|
||||
bind_transaction(Trx, {#domain_InvoicePayment{trx = Trx}, _}) ->
|
||||
% got the same transaction as one which has been bound previously
|
||||
[];
|
||||
bind_transaction(Trx, {#domain_InvoicePayment{id = PaymentID, trx = TrxWas}, _}) ->
|
||||
% got transaction which differs from the bound one
|
||||
% verify against proxy contracts
|
||||
case Trx#domain_TransactionInfo.id of
|
||||
ID when ID =:= TrxWas#domain_TransactionInfo.id ->
|
||||
[?payment_ev(?payment_bound(PaymentID, Trx))];
|
||||
_ ->
|
||||
error(proxy_contract_violated)
|
||||
end.
|
||||
|
||||
handle_proxy_intent(#'FinishIntent'{status = {ok, _}}, _ProxyState, St) ->
|
||||
PaymentID = get_payment_id(St),
|
||||
Target = get_target(St),
|
||||
Events = [?payment_ev(?payment_status_changed(PaymentID, Target))],
|
||||
Action = hg_machine_action:new(),
|
||||
{done, {Events, Action}};
|
||||
|
||||
handle_proxy_intent(#'FinishIntent'{status = {failure, Error}}, _ProxyState, St) ->
|
||||
fail(construct_error(Error), St);
|
||||
|
||||
handle_proxy_intent(#'SleepIntent'{timer = Timer}, ProxyState, _St) ->
|
||||
Action = hg_machine_action:set_timer(Timer),
|
||||
Events = [?session_ev({proxy_state_changed, ProxyState})],
|
||||
{next, {Events, Action}};
|
||||
|
||||
handle_proxy_intent(
|
||||
#'SuspendIntent'{tag = Tag, timeout = Timer, user_interaction = UserInteraction},
|
||||
ProxyState, St
|
||||
) ->
|
||||
Action = try_set_timer(Timer, hg_machine_action:set_tag(Tag)),
|
||||
Events = [
|
||||
?session_ev({proxy_state_changed, ProxyState}),
|
||||
?session_ev(suspended)
|
||||
| try_emit_interaction_event(UserInteraction, St)
|
||||
],
|
||||
{next, {Events, Action}}.
|
||||
|
||||
try_set_timer(undefined, Action) ->
|
||||
Action;
|
||||
try_set_timer(Timer, Action) ->
|
||||
hg_machine_action:set_timer(Timer, Action).
|
||||
|
||||
try_emit_interaction_event(undefined, _St) ->
|
||||
[];
|
||||
try_emit_interaction_event(UserInteraction, St) ->
|
||||
[?payment_ev(?payment_interaction_requested(get_payment_id(St), UserInteraction))].
|
||||
|
||||
handle_exception(#'TryLater'{e = Error}, St) ->
|
||||
case retry(St) of
|
||||
{wait, Timeout, Events} ->
|
||||
Action = hg_machine_action:set_timeout(Timeout),
|
||||
{next, {Events, Action}};
|
||||
finish ->
|
||||
fail(construct_error(Error), St)
|
||||
end.
|
||||
|
||||
retry({_Payment, #{retry := Retry}}) ->
|
||||
case genlib_retry:next_step(Retry) of
|
||||
{wait, Timeout, RetryNext} ->
|
||||
{wait, Timeout div 1000, [?session_ev({proxy_retry_changed, RetryNext})]};
|
||||
finish ->
|
||||
finish
|
||||
end.
|
||||
|
||||
fail(Error, St) ->
|
||||
Events = [?payment_ev(?payment_status_changed(get_payment_id(St), ?failed(Error)))],
|
||||
Action = hg_machine_action:new(),
|
||||
{done, {Events, Action}}.
|
||||
|
||||
construct_retry_strategy(_Target) ->
|
||||
Timecap = 30000,
|
||||
Timeout = 10000,
|
||||
genlib_retry:timecap(Timecap, genlib_retry:linear(infinity, Timeout)).
|
||||
|
||||
construct_proxy_context({Payment, Session}, Options = #{proxy := Proxy}) ->
|
||||
#'Context'{
|
||||
session = construct_session(Session),
|
||||
payment = construct_payment_info(Payment, Options),
|
||||
options = Proxy#domain_Proxy.options
|
||||
}.
|
||||
|
||||
map_error(#'Error'{code = Code, description = Description}) ->
|
||||
construct_session(#{target := Target, proxy_state := ProxyState}) ->
|
||||
#'Session'{
|
||||
target = Target,
|
||||
state = ProxyState
|
||||
}.
|
||||
|
||||
construct_payment_info(Payment, #{invoice := Invoice}) ->
|
||||
#'PaymentInfo'{
|
||||
invoice = Invoice,
|
||||
payment = Payment
|
||||
}.
|
||||
|
||||
construct_error(#'Error'{code = Code, description = Description}) ->
|
||||
construct_error(Code, Description);
|
||||
construct_error(Code) when is_binary(Code) ->
|
||||
construct_error(Code, undefined).
|
||||
|
||||
construct_error(Code, Description) ->
|
||||
#domain_OperationError{code = Code, description = Description}.
|
||||
|
||||
%%
|
||||
|
||||
construct_state(Payment, Invoice) ->
|
||||
#st{
|
||||
stage = process_payment,
|
||||
proxy_ref = select_proxy(Payment, Invoice)
|
||||
get_payment_id({Payment, _State}) ->
|
||||
Payment#domain_InvoicePayment.id.
|
||||
|
||||
get_status({_Payment, #{status := Status}}) ->
|
||||
Status.
|
||||
|
||||
get_target({_Payment, #{target := Target}}) ->
|
||||
Target.
|
||||
|
||||
%%
|
||||
|
||||
-spec merge_event(ev(), st()) -> st().
|
||||
|
||||
merge_event(?payment_ev(Event), St) ->
|
||||
merge_public_event(Event, St);
|
||||
merge_event(?session_ev(Event), St) ->
|
||||
merge_session_event(Event, St).
|
||||
|
||||
merge_public_event(?payment_started(Payment), undefined) ->
|
||||
{Payment, undefined};
|
||||
merge_public_event(?payment_bound(_, Trx), {Payment, State}) ->
|
||||
{Payment#domain_InvoicePayment{trx = Trx}, State};
|
||||
merge_public_event(?payment_status_changed(_, Status), {Payment, State}) ->
|
||||
{Payment#domain_InvoicePayment{status = Status}, State}.
|
||||
|
||||
%% TODO session_finished?
|
||||
merge_session_event({started, Target}, {Payment, _}) ->
|
||||
{Payment, create_session(Target)};
|
||||
merge_session_event({proxy_state_changed, ProxyState}, {Payment, Session}) ->
|
||||
{Payment, Session#{proxy_state => ProxyState}};
|
||||
merge_session_event({proxy_retry_changed, Retry}, {Payment, Session}) ->
|
||||
{Payment, Session#{retry => Retry}};
|
||||
merge_session_event(activated, {Payment, Session}) ->
|
||||
{Payment, Session#{status => active}};
|
||||
merge_session_event(suspended, {Payment, Session}) ->
|
||||
{Payment, Session#{status => suspended}}.
|
||||
|
||||
create_session(Target) ->
|
||||
#{
|
||||
target => Target,
|
||||
status => active,
|
||||
proxy_state => undefined,
|
||||
retry => construct_retry_strategy(Target)
|
||||
}.
|
||||
|
||||
select_proxy(_, _) ->
|
||||
% FIXME: turbo routing
|
||||
#domain_ProxyRef{id = 1}.
|
||||
|
||||
-spec marshal_st(st()) -> binary().
|
||||
|
||||
marshal_st(St) ->
|
||||
term_to_binary(St).
|
||||
|
||||
-spec unmarshal_st(binary()) -> st().
|
||||
|
||||
unmarshal_st(St) ->
|
||||
binary_to_term(St).
|
||||
|
||||
%% Proxy provider client
|
||||
%%
|
||||
|
||||
-define(SERVICE, {hg_proxy_provider_thrift, 'ProviderProxy'}).
|
||||
|
||||
-type process_payment_result() :: hg_proxy_provider_thrift:'ProcessResult'().
|
||||
issue_process_call(ProxyContext, Opts, Context) ->
|
||||
issue_call({?SERVICE, 'ProcessPayment', [ProxyContext]}, Opts, Context).
|
||||
|
||||
-spec process_payment(
|
||||
hg_domain_thrift:'Proxy'(),
|
||||
hg_proxy_provider_thrift:'PaymentInfo'(),
|
||||
woody_client:context()
|
||||
) ->
|
||||
{process_payment_result(), woody_client:context()}.
|
||||
process_payment(Proxy, PaymentInfo, Context) ->
|
||||
call(Context, Proxy, {?SERVICE, 'ProcessPayment', [PaymentInfo]}).
|
||||
issue_callback_call(Payload, ProxyContext, Opts, Context) ->
|
||||
issue_call({?SERVICE, 'HandlePaymentCallback', [Payload, ProxyContext]}, Opts, Context).
|
||||
|
||||
-spec capture_payment(
|
||||
hg_domain_thrift:'Proxy'(),
|
||||
hg_proxy_provider_thrift:'PaymentInfo'(),
|
||||
woody_client:context()
|
||||
) ->
|
||||
{process_payment_result(), woody_client:context()}.
|
||||
capture_payment(Proxy, PaymentInfo, Context) ->
|
||||
call(Context, Proxy, {?SERVICE, 'CapturePayment', [PaymentInfo]}).
|
||||
|
||||
call(Context = #{client_context := ClientContext0}, Proxy, Call) ->
|
||||
Endpoint = get_call_options(Proxy),
|
||||
try woody_client:call(ClientContext0, Call, Endpoint) of
|
||||
{{ok, Result = #'ProcessResult'{}}, ClientContext} ->
|
||||
{Result, Context#{client_context => ClientContext}}
|
||||
catch
|
||||
% TODO: support retry strategies
|
||||
{#'TryLater'{e = _Error}, ClientContext} ->
|
||||
Result = #'ProcessResult'{intent = {sleep, #'SleepIntent'{timer = {timeout, 10}}}},
|
||||
{Result, Context#{client_context => ClientContext}}
|
||||
end.
|
||||
issue_call(Call, #{proxy := Proxy}, Context = #{client_context := ClientContext}) ->
|
||||
{Result, ClientContext1} = woody_client:call_safe(ClientContext, Call, get_call_options(Proxy)),
|
||||
{Result, Context#{client_context := ClientContext1}}.
|
||||
|
||||
get_call_options(#domain_Proxy{url = Url}) ->
|
||||
#{url => Url}.
|
||||
|
@ -1,6 +1,7 @@
|
||||
-module(hg_machine).
|
||||
|
||||
-type id() :: hg_base_thrift:'ID'().
|
||||
-type ref() :: hg_state_processing_thrift:'Reference'().
|
||||
-type ns() :: hg_base_thrift:'Namespace'().
|
||||
-type args() :: _.
|
||||
|
||||
@ -88,12 +89,12 @@
|
||||
start(Ns, ID, Args, #{client_context := Context0}) ->
|
||||
call_automaton('Start', [Ns, ID, wrap_args(Args)], Context0).
|
||||
|
||||
-spec call(ns(), id(), term(), opts()) ->
|
||||
-spec call(ns(), ref(), term(), opts()) ->
|
||||
{ok | {ok, term()} | {error, notfound | failed}, woody_client:context()} |
|
||||
no_return().
|
||||
|
||||
call(Ns, ID, Args, #{client_context := Context0}) ->
|
||||
case call_automaton('Call', [Ns, {id, ID}, wrap_args(Args)], Context0) of
|
||||
call(Ns, Ref, Args, #{client_context := Context0}) ->
|
||||
case call_automaton('Call', [Ns, Ref, wrap_args(Args)], Context0) of
|
||||
{{ok, Response}, Context} ->
|
||||
% should be specific to a processing interface already
|
||||
case unmarshal_term(Response) of
|
||||
|
@ -134,7 +134,7 @@ start(ID, Args, Context) ->
|
||||
map_start_error(hg_machine:start(?NS, ID, Args, opts(Context))).
|
||||
|
||||
call(ID, Args, Context) ->
|
||||
map_error(hg_machine:call(?NS, ID, Args, opts(Context))).
|
||||
map_error(hg_machine:call(?NS, {id, ID}, Args, opts(Context))).
|
||||
|
||||
map_start_error({{error, exists}, Context}) ->
|
||||
throw({#payproc_PartyExists{}, Context});
|
||||
|
31
apps/hellgate/src/hg_proxy_host_provider.erl
Normal file
31
apps/hellgate/src/hg_proxy_host_provider.erl
Normal file
@ -0,0 +1,31 @@
|
||||
%%% Host handler for provider proxies
|
||||
%%%
|
||||
%%% TODO
|
||||
%%% - designate an exception when specified tag is missing
|
||||
|
||||
-module(hg_proxy_host_provider).
|
||||
-include_lib("hg_proto/include/hg_proxy_provider_thrift.hrl").
|
||||
|
||||
%% Woody handler
|
||||
|
||||
-behaviour(woody_server_thrift_handler).
|
||||
|
||||
-export([handle_function/4]).
|
||||
|
||||
%%
|
||||
|
||||
-type tag() :: hg_base_thrift:'Tag'().
|
||||
-type callback() :: hg_proxy_provider_thrift:'Callback'().
|
||||
|
||||
-spec handle_function('ProcessCallback', {tag(), callback()}, woody_client:context(), _) ->
|
||||
{{ok, term()}, woody_client:context()} | no_return().
|
||||
|
||||
handle_function('ProcessCallback', {Tag, Callback}, Context, _) ->
|
||||
map_error(hg_invoice:process_callback(Tag, {provider, Callback}, Context)).
|
||||
|
||||
map_error({{error, notfound}, Context}) ->
|
||||
throw({#'InvalidRequest'{errors = [<<"notfound">>]}, Context});
|
||||
map_error({{error, Reason}, _Context}) ->
|
||||
error(Reason);
|
||||
map_error({Ok, Context}) ->
|
||||
{Ok, Context}.
|
@ -2,37 +2,15 @@
|
||||
-behaviour(woody_server_thrift_handler).
|
||||
|
||||
-export([handle_function/4]).
|
||||
-export([handle_error/4]).
|
||||
|
||||
-behaviour(hg_test_proxy).
|
||||
|
||||
-export([get_child_spec/2]).
|
||||
-export([get_url/2]).
|
||||
-export([get_service_spec/0]).
|
||||
|
||||
%%
|
||||
|
||||
-spec get_child_spec(inet:hostname() | inet:ip_address(), inet:port_number()) ->
|
||||
supervisor:child_spec().
|
||||
|
||||
get_child_spec(Host, Port) ->
|
||||
{Path, Service} = get_service_spec(),
|
||||
woody_server:child_spec(
|
||||
?MODULE,
|
||||
#{
|
||||
ip => hg_utils:get_hostname_ip(Host),
|
||||
port => Port,
|
||||
net_opts => [],
|
||||
event_handler => hg_woody_event_handler,
|
||||
handlers => [{Path, {Service, ?MODULE, []}}]
|
||||
}
|
||||
).
|
||||
|
||||
-spec get_url(inet:hostname() | inet:ip_address(), inet:port_number()) ->
|
||||
woody_t:url().
|
||||
|
||||
get_url(Host, Port) ->
|
||||
{Path, _Service} = get_service_spec(),
|
||||
iolist_to_binary(["http://", Host, ":", integer_to_list(Port), Path]).
|
||||
-spec get_service_spec() ->
|
||||
hg_proto:service_spec().
|
||||
|
||||
get_service_spec() ->
|
||||
{"/test/proxy/provider/dummy", {hg_proxy_provider_thrift, 'ProviderProxy'}}.
|
||||
@ -40,35 +18,80 @@ get_service_spec() ->
|
||||
%%
|
||||
|
||||
-include_lib("hg_proto/include/hg_proxy_provider_thrift.hrl").
|
||||
-include_lib("hellgate/include/invoice_events.hrl").
|
||||
|
||||
-spec handle_function(woody_t:func(), woody_server_thrift_handler:args(), woody_client:context(), []) ->
|
||||
-spec handle_function(woody_t:func(), woody_server_thrift_handler:args(), woody_client:context(), #{}) ->
|
||||
{{ok, term()}, woody_client:context()} | no_return().
|
||||
|
||||
handle_function('ProcessPayment', {#'PaymentInfo'{state = undefined}}, Context, _Opts) ->
|
||||
handle_function(
|
||||
'ProcessPayment',
|
||||
{#'Context'{
|
||||
session = #'Session'{target = Target, state = State},
|
||||
payment = PaymentInfo,
|
||||
options = _
|
||||
}},
|
||||
Context,
|
||||
Opts
|
||||
) ->
|
||||
process_payment(Target, State, PaymentInfo, Opts, Context);
|
||||
|
||||
handle_function(
|
||||
'HandlePaymentCallback',
|
||||
{Payload, #'Context'{
|
||||
session = #'Session'{target = Target, state = State},
|
||||
payment = PaymentInfo,
|
||||
options = _
|
||||
}},
|
||||
Context,
|
||||
Opts
|
||||
) ->
|
||||
handle_callback(Payload, Target, State, PaymentInfo, Opts, Context).
|
||||
|
||||
process_payment(?processed(), undefined, _, _, Context) ->
|
||||
{{ok, sleep(1, <<"sleeping">>)}, Context};
|
||||
handle_function('ProcessPayment', {#'PaymentInfo'{state = <<"sleeping">>} = PaymentInfo}, Context, _Opts) ->
|
||||
process_payment(?processed(), <<"sleeping">>, PaymentInfo, _, Context) ->
|
||||
{{ok, finish(PaymentInfo)}, Context};
|
||||
|
||||
handle_function('CapturePayment', {PaymentInfo}, Context, _Opts) ->
|
||||
{{ok, finish(PaymentInfo)}, Context};
|
||||
|
||||
handle_function('CancelPayment', {PaymentInfo}, Context, _Opts) ->
|
||||
process_payment(?captured(), undefined, _, Opts, Context) ->
|
||||
Tag = hg_utils:unique_id(),
|
||||
_Pid = spawn(fun () -> callback(Tag, <<"payload">>, <<"sure">>, Opts, 1000) end),
|
||||
{{ok, suspend(Tag, 3, <<"suspended">>)}, Context};
|
||||
process_payment(?captured(), <<"sleeping">>, PaymentInfo, _, Context) ->
|
||||
{{ok, finish(PaymentInfo)}, Context}.
|
||||
|
||||
handle_callback(<<"payload">>, ?captured(), <<"suspended">>, _, _, Context) ->
|
||||
{{ok, respond(<<"sure">>, sleep(1, <<"sleeping">>))}, Context}.
|
||||
|
||||
finish(#'PaymentInfo'{payment = Payment}) ->
|
||||
#'ProcessResult'{
|
||||
#'ProxyResult'{
|
||||
intent = {finish, #'FinishIntent'{status = {ok, #'Ok'{}}}},
|
||||
trx = #domain_TransactionInfo{id = Payment#domain_InvoicePayment.id}
|
||||
}.
|
||||
|
||||
sleep(Timeout, State) ->
|
||||
#'ProcessResult'{
|
||||
#'ProxyResult'{
|
||||
intent = {sleep, #'SleepIntent'{timer = {timeout, Timeout}}},
|
||||
next_state = State
|
||||
}.
|
||||
|
||||
-spec handle_error(woody_t:func(), term(), woody_client:context(), []) ->
|
||||
_.
|
||||
suspend(Tag, Timeout, State) ->
|
||||
#'ProxyResult'{
|
||||
intent = {suspend, #'SuspendIntent'{
|
||||
tag = Tag,
|
||||
timeout = {timeout, Timeout}
|
||||
}},
|
||||
next_state = State
|
||||
}.
|
||||
|
||||
handle_error(_Function, _Reason, _Context, _Opts) ->
|
||||
ok.
|
||||
respond(Response, Result) ->
|
||||
#'CallbackResult'{
|
||||
response = Response,
|
||||
result = Result
|
||||
}.
|
||||
|
||||
callback(Tag, Payload, Expect, #{hellgate_root_url := RootUrl}, Timeout) ->
|
||||
_ = timer:sleep(Timeout),
|
||||
{ok, Expect} = hg_client_api:call(
|
||||
proxy_host_provider, 'ProcessCallback', [Tag, Payload],
|
||||
hg_client_api:new(RootUrl)
|
||||
).
|
||||
|
@ -110,7 +110,8 @@ payment_success(C) ->
|
||||
{ok, PaymentID} = hg_client_invoicing:start_payment(InvoiceID, PaymentParams, Client),
|
||||
?payment_started(?payment_w_status(?pending())) = next_event(InvoiceID, Client),
|
||||
?payment_bound(PaymentID, ?trx_info(PaymentID)) = next_event(InvoiceID, Client),
|
||||
?payment_status_changed(PaymentID, ?succeeded()) = next_event(InvoiceID, Client),
|
||||
?payment_status_changed(PaymentID, ?processed()) = next_event(InvoiceID, Client),
|
||||
?payment_status_changed(PaymentID, ?captured()) = next_event(InvoiceID, Client),
|
||||
?invoice_status_changed(?paid()) = next_event(InvoiceID, Client),
|
||||
timeout = next_event(InvoiceID, 1000, Client).
|
||||
|
||||
@ -149,7 +150,8 @@ unwrap_event(E) ->
|
||||
start_service_handler(Module, C) ->
|
||||
Host = "localhost",
|
||||
Port = get_random_port(),
|
||||
ChildSpec = hg_test_proxy:get_child_spec(Module, Host, Port),
|
||||
Opts = #{hellgate_root_url => ?c(root_url, C)},
|
||||
ChildSpec = hg_test_proxy:get_child_spec(Module, Host, Port, Opts),
|
||||
{ok, _} = supervisor:start_child(?c(test_sup, C), ChildSpec),
|
||||
hg_test_proxy:get_url(Module, Host, Port).
|
||||
|
||||
|
@ -2,10 +2,10 @@
|
||||
|
||||
-type host() :: inet:hostname() | inet:ip_address().
|
||||
|
||||
-callback get_child_spec(host(), inet:port_number()) -> supervisor:child_spec().
|
||||
-callback get_url(host(), inet:port_number()) -> woody_t:url().
|
||||
-callback get_service_spec() -> hg_proto:service_spec().
|
||||
|
||||
-export([get_child_spec/3]).
|
||||
-export([get_child_spec/4]).
|
||||
-export([get_url/3]).
|
||||
|
||||
%%
|
||||
@ -14,10 +14,27 @@
|
||||
supervisor:child_spec().
|
||||
|
||||
get_child_spec(Module, Host, Port) ->
|
||||
Module:get_child_spec(Host, Port).
|
||||
get_child_spec(Module, Host, Port, []).
|
||||
|
||||
-spec get_child_spec(module(), host(), inet:port_number(), #{}) ->
|
||||
supervisor:child_spec().
|
||||
|
||||
get_child_spec(Module, Host, Port, Args) ->
|
||||
{Path, Service} = Module:get_service_spec(),
|
||||
woody_server:child_spec(
|
||||
?MODULE,
|
||||
#{
|
||||
ip => hg_utils:get_hostname_ip(Host),
|
||||
port => Port,
|
||||
net_opts => [],
|
||||
event_handler => hg_woody_event_handler,
|
||||
handlers => [{Path, {Service, Module, Args}}]
|
||||
}
|
||||
).
|
||||
|
||||
-spec get_url(module(), host(), inet:port_number()) ->
|
||||
supervisor:child_spec().
|
||||
|
||||
get_url(Module, Host, Port) ->
|
||||
Module:get_url(Host, Port).
|
||||
{Path, _Service} = Module:get_service_spec(),
|
||||
iolist_to_binary(["http://", Host, ":", integer_to_list(Port), Path]).
|
||||
|
@ -1 +1 @@
|
||||
Subproject commit 2e89b6930c2aa865ca6a4f962a6f4b985e550481
|
||||
Subproject commit 406fd17f79fdfe8823ccfeddaa571e6cf61c7d28
|
@ -32,4 +32,8 @@ get_service_spec(invoicing, #{}) ->
|
||||
|
||||
get_service_spec(processor, #{namespace := Ns}) when is_binary(Ns) ->
|
||||
Service = {hg_state_processing_thrift, 'Processor'},
|
||||
{?VERSION_PREFIX ++ "/stateproc/" ++ binary_to_list(Ns), Service}.
|
||||
{?VERSION_PREFIX ++ "/stateproc/" ++ binary_to_list(Ns), Service};
|
||||
|
||||
get_service_spec(proxy_host_provider, #{}) ->
|
||||
Service = {hg_proxy_provider_thrift, 'ProviderProxyHost'},
|
||||
{?VERSION_PREFIX ++ "/proxyhost/provider", Service}.
|
||||
|
@ -15,8 +15,8 @@
|
||||
{hellgate, [
|
||||
{ip, "::"},
|
||||
{port, 8022},
|
||||
{automaton_service_url, <<"http://machinegun:8022/v1/automaton">>},
|
||||
{eventsink_service_url, <<"http://machinegun:8022/v1/event_sink">>},
|
||||
{provider_proxy_url, <<"http://proxy_vtb:8022/proxy">>}
|
||||
{automaton_service_url , <<"http://machinegun:8022/v1/automaton">>},
|
||||
{eventsink_service_url , <<"http://machinegun:8022/v1/event_sink">>},
|
||||
{provider_proxy_url , <<"http://proxy_vtb:8022/proxy">>}
|
||||
]}
|
||||
].
|
||||
|
@ -2,6 +2,7 @@
|
||||
cat <<EOF
|
||||
version: '2'
|
||||
services:
|
||||
|
||||
${SERVICE_NAME}:
|
||||
image: ${BUILD_IMAGE}
|
||||
volumes:
|
||||
@ -11,10 +12,12 @@ services:
|
||||
command: /sbin/init
|
||||
depends_on:
|
||||
- machinegun
|
||||
|
||||
machinegun:
|
||||
image: dr.rbkmoney.com/rbkmoney/machinegun:cc5985c4b1ea385eba141995c37ebc67093a1fe7
|
||||
image: dr.rbkmoney.com/rbkmoney/machinegun:4c29acdcdce065dbba1f3c8ee1683caea837869c
|
||||
volumes:
|
||||
- ./test/machinegun/sys.config:/opt/machinegun/releases/0.1.0/sys.config
|
||||
|
||||
networks:
|
||||
default:
|
||||
driver: bridge
|
||||
|
@ -8,7 +8,12 @@
|
||||
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.7">>},1},
|
||||
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.5.7">>},1},
|
||||
{<<"idna">>,{pkg,<<"idna">>,<<"1.2.0">>},2},
|
||||
{<<"jsx">>,{pkg,<<"jsx">>,<<"2.8.0">>},1},
|
||||
{<<"lager">>,{pkg,<<"lager">>,<<"3.0.2">>},0},
|
||||
{<<"lager_logstash_formatter">>,
|
||||
{git,"git@github.com:rbkmoney/lager_logstash_formatter.git",
|
||||
{ref,"ea2967f656d344429a71e59b5acaff4c234ad15b"}},
|
||||
0},
|
||||
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
|
||||
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.2">>},2},
|
||||
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.2.1">>},2},
|
||||
|
@ -1,8 +1,15 @@
|
||||
[
|
||||
{mg_woody_api, [
|
||||
{nss, [
|
||||
{<<"invoice">> , <<"http://hellgate:8022/v1/stateproc/invoice">>},
|
||||
{<<"party">> , <<"http://hellgate:8022/v1/stateproc/party">> }
|
||||
]}
|
||||
{storage, mg_storage_test},
|
||||
{namespaces, #{
|
||||
<<"invoice">> => #{
|
||||
url => <<"http://hellgate:8022/v1/stateproc/invoice">>,
|
||||
event_sink => <<"payproc">>
|
||||
},
|
||||
<<"party">> => #{
|
||||
url => <<"http://hellgate:8022/v1/stateproc/party">>,
|
||||
event_sink => <<"payproc">>
|
||||
}
|
||||
}}
|
||||
]}
|
||||
].
|
||||
|
Loading…
Reference in New Issue
Block a user