From c1b6e028473a675ec8c72c10da3ba21d854267e7 Mon Sep 17 00:00:00 2001 From: Artem Ocheredko Date: Thu, 8 Dec 2016 19:18:32 +0300 Subject: [PATCH] HG-74 Integrate with new damsel protocols (#50) * HG-74 Integrate with new damsel protocols --- apps/hellgate/src/hg_accounting.erl | 55 +++++++++++++------- apps/hellgate/src/hg_invoice_payment.erl | 7 ++- apps/hellgate/src/hg_machine.erl | 54 +++++++++++++------ apps/hellgate/src/hg_party.erl | 4 +- apps/hellgate/src/hg_proxy_host_provider.erl | 2 +- apps/hellgate/test/hg_ct_helper.erl | 18 ++++++- apps/hellgate/test/hg_eventsink_history.erl | 6 +++ build_utils | 2 +- docker-compose.sh | 39 +++++++------- rebar.lock | 6 +-- 10 files changed, 127 insertions(+), 66 deletions(-) diff --git a/apps/hellgate/src/hg_accounting.erl b/apps/hellgate/src/hg_accounting.erl index 2080e72..498f3fb 100644 --- a/apps/hellgate/src/hg_accounting.erl +++ b/apps/hellgate/src/hg_accounting.erl @@ -22,11 +22,15 @@ -type currency_code() :: dmsl_domain_thrift:'CurrencySymbolicCode'(). -type account_id() :: dmsl_accounter_thrift:'AccountID'(). -type plan_id() :: dmsl_accounter_thrift:'PlanID'(). +-type batch_id() :: dmsl_accounter_thrift:'BatchID'(). +-type batch() :: {batch_id(), hg_cashflow:t()}. + +-export_type([batch/0]). -type account() :: #{ account_id => account_id(), own_amount => amount(), - available_amount => amount(), + min_available_amount => amount(), currency_code => currency_code() }. @@ -61,27 +65,26 @@ construct_prototype(CurrencyCode, Description) -> }. %% - -type accounts_map() :: #{hg_cashflow:account() => account_id()}. -type accounts_state() :: #{hg_cashflow:account() => account()}. --spec plan(plan_id(), hg_cashflow:t(), accounts_map()) -> +-spec plan(plan_id(), batch(), accounts_map()) -> accounts_state(). -plan(PlanID, Cashflow, AccountMap) -> - do('Hold', construct_plan(PlanID, Cashflow, AccountMap), AccountMap). +plan(PlanID, Batch, AccountMap) -> + do('Hold', construct_plan_change(PlanID, Batch, AccountMap), AccountMap). --spec commit(plan_id(), hg_cashflow:t(), accounts_map()) -> +-spec commit(plan_id(), [batch()], accounts_map()) -> accounts_state(). -commit(PlanID, Cashflow, AccountMap) -> - do('CommitPlan', construct_plan(PlanID, Cashflow, AccountMap), AccountMap). +commit(PlanID, Batches, AccountMap) -> + do('CommitPlan', construct_plan(PlanID, Batches, AccountMap), AccountMap). --spec rollback(plan_id(), hg_cashflow:t(), accounts_map()) -> +-spec rollback(plan_id(), [batch()], accounts_map()) -> accounts_state(). -rollback(PlanID, Cashflow, AccountMap) -> - do('RollbackPlan', construct_plan(PlanID, Cashflow, AccountMap), AccountMap). +rollback(PlanID, Batches, AccountMap) -> + do('RollbackPlan', construct_plan(PlanID, Batches, AccountMap), AccountMap). do(Op, Plan, AccountMap) -> try @@ -92,24 +95,36 @@ do(Op, Plan, AccountMap) -> error(Exception) % FIXME end. -construct_plan(PlanID, Cashflow, AccountMap) -> +construct_plan_change(PlanID, {BatchID, Cashflow}, AccountMap) -> + #accounter_PostingPlanChange{ + id = PlanID, + batch = #accounter_PostingBatch{ + id = BatchID, + postings = collect_postings(Cashflow, AccountMap) + } + }. + +construct_plan(PlanID, Batches, AccountMap) -> #accounter_PostingPlan{ id = PlanID, - batch = collect_postings(Cashflow, AccountMap) + batch_list = [ + #accounter_PostingBatch{ + id = BatchID, + postings = collect_postings(Cashflow, AccountMap) + } + || {BatchID, Cashflow} <- Batches] }. collect_postings(Cashflow, AccountMap) -> [ #accounter_Posting{ - id = ID, from_id = resolve_account(Source, AccountMap), to_id = resolve_account(Destination, AccountMap), amount = Amount, currency_sym_code = CurrencyCode, description = <<>> - } || - {ID, {Source, Destination, Amount, CurrencyCode}} <- - lists:zip(lists:seq(1, length(Cashflow)), Cashflow) + } + || {Source, Destination, Amount, CurrencyCode} <- Cashflow ]. resolve_account(Account, Accounts) -> @@ -137,14 +152,14 @@ construct_account( AccountID, #accounter_Account{ own_amount = OwnAmount, - available_amount = AvailableAmount, - currency_sym_code = CurrencyCode + currency_sym_code = CurrencyCode, + min_available_amount = MinAvailableAmount } ) -> #{ account_id => AccountID, own_amount => OwnAmount, - available_amount => AvailableAmount, + min_available_amount => MinAvailableAmount, currency_code => CurrencyCode }. diff --git a/apps/hellgate/src/hg_invoice_payment.erl b/apps/hellgate/src/hg_invoice_payment.erl index de96b50..5bc869c 100644 --- a/apps/hellgate/src/hg_invoice_payment.erl +++ b/apps/hellgate/src/hg_invoice_payment.erl @@ -84,6 +84,9 @@ activated. -define(session_ev(E), {session_event, E}). +%% + +-define(BATCH_ID, 1). %% @@ -120,7 +123,7 @@ init(PaymentID, PaymentParams, Opts) -> AccountMap = collect_account_map(Computed, Shop, Route, VS2, Revision), _AccountsState = hg_accounting:plan( construct_plan_id(Invoice, Payment), - Computed, + {?BATCH_ID, Computed}, AccountMap ), Cashflow = construct_payment_cash_flow(Computed, AccountMap), @@ -432,7 +435,7 @@ finalize_plan(Finalizer, St, Options) -> PlanID = construct_plan_id(get_invoice(Options), get_payment(St)), Computed = get_computed_cashflow(Options, St), AccountMap = get_account_map(St), - Finalizer(PlanID, Computed, AccountMap). + Finalizer(PlanID, [{?BATCH_ID, Computed}], AccountMap). get_account_map(#st{cashflow = #domain_InvoicePaymentCashFlow{account_map = V}}) -> V. diff --git a/apps/hellgate/src/hg_machine.erl b/apps/hellgate/src/hg_machine.erl index aa13814..73bd921 100644 --- a/apps/hellgate/src/hg_machine.erl +++ b/apps/hellgate/src/hg_machine.erl @@ -5,6 +5,8 @@ -type ns() :: dmsl_base_thrift:'Namespace'(). -type args() :: _. +-type machine() :: dmsl_state_processing_thrift:'Machine'(). + -type event() :: event(_). -type event(T) :: {event_id(), timestamp(), T}. -type event_id() :: dmsl_base_thrift:'EventID'(). @@ -13,6 +15,9 @@ -type history() :: history(_). -type history(T) :: [event(T)]. +-type history_range() :: dmsl_state_processing_thrift:'HistoryRange'(). +-type descriptor() :: dmsl_state_processing_thrift:'MachineDescriptor'(). + -type result(T) :: {[T], hg_machine_action:t()}. -type result() :: result(_). @@ -90,7 +95,8 @@ start(Ns, ID, Args) -> {ok, term()} | {error, notfound | failed} | no_return(). call(Ns, Ref, Args) -> - case call_automaton('Call', [Ns, Ref, wrap_args(Args)]) of + Descriptor = prepare_descriptor(Ns, Ref, #'HistoryRange'{}), + case call_automaton('Call', [Descriptor, wrap_args(Args)]) of {ok, Response} when is_binary(Response) -> % should be specific to a processing interface already {ok, unmarshal_term(Response)}; @@ -112,8 +118,9 @@ get_history(Ns, ID, AfterID, Limit) -> get_history(Ns, ID, Range) -> LastID = #'HistoryRange'.'after', - case call_automaton('GetHistory', [Ns, {id, ID}, Range]) of - {ok, History} when is_list(History) -> + Descriptor = prepare_descriptor(Ns, {id, ID}, Range), + case call_automaton('GetMachine', [Descriptor]) of + {ok, #'Machine'{history = History}} when is_list(History) -> {ok, unwrap_history(History, LastID)}; Error -> Error @@ -143,17 +150,17 @@ call_automaton(Function, Args) -> handle_function('ProcessSignal', {Args}, #{ns := Ns} = _Opts) -> _ = hg_utils:logtag_process(namespace, Ns), - #'SignalArgs'{signal = {_Type, Signal}, history = History} = Args, - dispatch_signal(Ns, Signal, History); + #'SignalArgs'{signal = {_Type, Signal}, machine = Machine} = Args, + dispatch_signal(Ns, Signal, Machine); handle_function('ProcessCall', {Args}, #{ns := Ns} = _Opts) -> _ = hg_utils:logtag_process(namespace, Ns), - #'CallArgs'{arg = Payload, history = History} = Args, - dispatch_call(Ns, Payload, History). + #'CallArgs'{arg = Payload, machine = Machine} = Args, + dispatch_call(Ns, Payload, Machine). %% --spec dispatch_signal(ns(), Signal, hg_machine:history()) -> +-spec dispatch_signal(ns(), Signal, machine()) -> Result when Signal :: dmsl_state_processing_thrift:'InitSignal'() | @@ -162,21 +169,21 @@ handle_function('ProcessCall', {Args}, #{ns := Ns} = _Opts) -> Result :: dmsl_state_processing_thrift:'SignalResult'(). -dispatch_signal(Ns, #'InitSignal'{id = ID, arg = Payload}, []) -> +dispatch_signal(Ns, #'InitSignal'{arg = Payload}, #'Machine'{id = ID}) -> Args = unwrap_args(Payload), _ = lager:debug("dispatch init with id = ~s and args = ~p", [ID, Args]), Module = get_handler_module(Ns), Result = Module:init(ID, Args), marshal_signal_result(Result); -dispatch_signal(Ns, #'TimeoutSignal'{}, History0) -> +dispatch_signal(Ns, #'TimeoutSignal'{}, #'Machine'{history = History0}) -> History = unwrap_events(History0), _ = lager:debug("dispatch timeout with history = ~p", [History]), Module = get_handler_module(Ns), Result = Module:process_signal(timeout, History), marshal_signal_result(Result); -dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, History0) -> +dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, #'Machine'{history = History0}) -> Args = unwrap_args(Payload), History = unwrap_events(History0), _ = lager:debug("dispatch repair with args = ~p and history: ~p", [Args, History]), @@ -186,17 +193,21 @@ dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, History0) -> marshal_signal_result({Events, Action}) -> _ = lager:debug("signal result with events = ~p and action = ~p", [Events, Action]), - #'SignalResult'{ + Change = #'MachineStateChange'{ events = wrap_events(Events), + aux_state = <<"">> %%% @TODO get state from process signal? + }, + #'SignalResult'{ + change = Change, action = Action }. --spec dispatch_call(ns(), Call, hg_machine:history()) -> +-spec dispatch_call(ns(), Call, machine()) -> Result when Call :: dmsl_state_processing_thrift:'Args'(), Result :: dmsl_state_processing_thrift:'CallResult'(). -dispatch_call(Ns, Payload, History0) -> +dispatch_call(Ns, Payload, #'Machine'{history = History0}) -> Args = unwrap_args(Payload), History = unwrap_events(History0), _ = lager:debug("dispatch call with args = ~p and history: ~p", [Args, History]), @@ -206,8 +217,13 @@ dispatch_call(Ns, Payload, History0) -> marshal_call_result({Response, {Events, Action}}) -> _ = lager:debug("call response = ~p with event = ~p and action = ~p", [Response, Events, Action]), - #'CallResult'{ + Change = #'MachineStateChange'{ events = wrap_events(Events), + aux_state = <<"">> %%% @TODO get state from process signal? + }, + + #'CallResult'{ + change = Change, action = Action, response = marshal_term(Response) }. @@ -300,3 +316,11 @@ marshal_term(V) -> unmarshal_term(B) -> binary_to_term(B). + +-spec prepare_descriptor(ns(), ref(), history_range()) -> descriptor(). +prepare_descriptor(NS, Ref, Range) -> + #'MachineDescriptor'{ + ns = NS, + ref = Ref, + range = Range + }. diff --git a/apps/hellgate/src/hg_party.erl b/apps/hellgate/src/hg_party.erl index c53377b..5e6a6aa 100644 --- a/apps/hellgate/src/hg_party.erl +++ b/apps/hellgate/src/hg_party.erl @@ -688,7 +688,7 @@ get_account_state(AccountID, St = #st{}) -> Account = hg_accounting:get_account(AccountID), #{ own_amount := OwnAmount, - available_amount := AvailableAmount, + min_available_amount := MinAvailableAmount, currency_code := CurrencyCode } = Account, CurrencyRef = #domain_CurrencyRef{ @@ -698,7 +698,7 @@ get_account_state(AccountID, St = #st{}) -> #payproc_ShopAccountState{ account_id = AccountID, own_amount = OwnAmount, - available_amount = AvailableAmount, + available_amount = MinAvailableAmount, currency = Currency }. diff --git a/apps/hellgate/src/hg_proxy_host_provider.erl b/apps/hellgate/src/hg_proxy_host_provider.erl index e29ddf7..7e20657 100644 --- a/apps/hellgate/src/hg_proxy_host_provider.erl +++ b/apps/hellgate/src/hg_proxy_host_provider.erl @@ -15,7 +15,7 @@ %% -type tag() :: dmsl_base_thrift:'Tag'(). --type callback() :: dmsl_proxy_provider_thrift:'Callback'(). +-type callback() :: dmsl_proxy_thrift:'Callback'(). -spec handle_function('ProcessCallback', {tag(), callback()}, hg_woody_wrapper:handler_opts()) -> term() | no_return(). diff --git a/apps/hellgate/test/hg_ct_helper.erl b/apps/hellgate/test/hg_ct_helper.erl index e3cd8ed..481977d 100644 --- a/apps/hellgate/test/hg_ct_helper.erl +++ b/apps/hellgate/test/hg_ct_helper.erl @@ -378,7 +378,7 @@ construct_domain_fixture() -> data = #domain_Terminal{ name = <<"Brominal 1">>, description = <<"Brominal 1">>, - payment_method = #domain_PaymentMethodRef{id = {bank_card, visa}}, + payment_method = ?pmt(bank_card, visa), category = ?cat(1), cash_flow = [ ?cfpost(provider, receipt, merchant, general, ?share(1, 1, payment_amount)), @@ -399,7 +399,7 @@ construct_domain_fixture() -> data = #domain_Terminal{ name = <<"Brominal 2">>, description = <<"Brominal 2">>, - payment_method = #domain_PaymentMethodRef{id = {bank_card, mastercard}}, + payment_method = ?pmt(bank_card, mastercard), category = ?cat(1), cash_flow = [ ?cfpost(provider, receipt, merchant, general, ?share(1, 1, payment_amount)), @@ -450,6 +450,20 @@ construct_domain_fixture() -> } } }}, + {payment_method, #domain_PaymentMethodObject{ + ref = ?pmt(bank_card, visa), + data = #domain_PaymentMethodDefinition{ + name = <<"Visa bank card">>, + description = <<"Visa is a major brand of cards issued by Visa">> + } + }}, + {payment_method, #domain_PaymentMethodObject{ + ref = ?pmt(bank_card, mastercard), + data = #domain_PaymentMethodDefinition{ + name = <<"Mastercard bank card">>, + description = <<"For everything else, there's MasterCard.">> + } + }}, {proxy, #domain_ProxyObject{ ref = ?prx(1), data = #domain_ProxyDefinition{ diff --git a/apps/hellgate/test/hg_eventsink_history.erl b/apps/hellgate/test/hg_eventsink_history.erl index a2aaa70..ce99926 100644 --- a/apps/hellgate/test/hg_eventsink_history.erl +++ b/apps/hellgate/test/hg_eventsink_history.erl @@ -20,6 +20,9 @@ -spec assert_total_order(history()) -> ok | no_return(). +assert_total_order([]) -> + ok; + assert_total_order([?event(ID, _, _, _) | Rest]) -> _ = lists:foldl( fun (?event(ID1, _, _, _), ID0) -> @@ -33,6 +36,9 @@ assert_total_order([?event(ID, _, _, _) | Rest]) -> -spec assert_contiguous_sequences(history()) -> ok | no_return(). +assert_contiguous_sequences([]) -> + ok; + assert_contiguous_sequences(Events) -> InvoiceSeqs = orddict:to_list(lists:foldl( fun (?event(_ID, InvoiceID, Seq, _), Acc) -> diff --git a/build_utils b/build_utils index b9a3a1d..0a57c5f 160000 --- a/build_utils +++ b/build_utils @@ -1 +1 @@ -Subproject commit b9a3a1d845b76b07bd964bbcb363a48249e2a0e7 +Subproject commit 0a57c5f10795d77ecf121d509fde7c654175c3c1 diff --git a/docker-compose.sh b/docker-compose.sh index 752c2e8..2167da7 100755 --- a/docker-compose.sh +++ b/docker-compose.sh @@ -15,44 +15,43 @@ services: - shumway dominant: - image: dr.rbkmoney.com/rbkmoney/dominant:f3c72168d9dfeb4da241d4eb5d6a29787c81faef + image: dr.rbkmoney.com/rbkmoney/dominant:be25663099fc549b14ec6d4b72bc72a76d4e2a66 command: /opt/dominant/bin/dominant foreground depends_on: - machinegun machinegun: - image: dr.rbkmoney.com/rbkmoney/machinegun:a48f9e93dd5a709d5f14db0c9785d43039282e86 + image: dr.rbkmoney.com/rbkmoney/machinegun:faa1156dd07a5cc72413616e3c73d48767654d3c command: /opt/machinegun/bin/machinegun foreground volumes: - ./test/machinegun/sys.config:/opt/machinegun/releases/0.1.0/sys.config shumway: - image: dr.rbkmoney.com/rbkmoney/shumway:cd00af9d70b28a7851295fca39bdeded5a3606b0 - entrypoint: | - java - -Xmx512m - -jar - /opt/shumway/shumway.jar - command: | - --spring.datasource.url=jdbc:postgresql://shumway_psql:5432/shumway - --spring.datasource.username=shumway - --spring.datasource.password=shumway - depends_on: - - shumway_psql + image: dr.rbkmoney.com/rbkmoney/shumway:ef494632710c3248a7d6a33fcbeb7944ce8fdd31 restart: always - - shumway_psql: + command: | + -Xmx512m + -jar /opt/shumway/shumway.jar + --spring.datasource.url=jdbc:postgresql://shumway-db:5432/shumway + --spring.datasource.username=postgres + --spring.datasource.password=postgres + depends_on: + - shumway-db + environment: + - SERVICE_NAME=shumway + shumway-db: image: dr.rbkmoney.com/rbkmoney/postgres:9.6 environment: - - POSTGRES_DATABASE=shumway - - POSTGRES_USER=shumway - - POSTGRES_PASSWORD=shumway + - POSTGRES_DB=shumway + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - SERVICE_NAME=shumway-db networks: default: driver: bridge driver_opts: com.docker.network.enable_ipv6: "true" - com.docker.network.bridge.enable_ip_masquerade: "false" + com.docker.network.bridge.enable_ip_masquerade: "true" EOF diff --git a/rebar.lock b/rebar.lock index 0fa59e6..8a3049f 100644 --- a/rebar.lock +++ b/rebar.lock @@ -3,15 +3,15 @@ {<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.2">>},2}, {<<"dmsl">>, {git,"git@github.com:rbkmoney/damsel_erlang.git", - {ref,"44709a08aa9dfbf2a84f10016868b133b35b6a7f"}}, + {ref,"92ac7b77a256a865f5c87fedcfa260e904ed01f6"}}, 0}, {<<"dmt">>, {git,"git@github.com:rbkmoney/dmt_core.git", - {ref,"2d4fc6b003808df131b32dfb253492c9b495930e"}}, + {ref,"8ebf5ea9bbe75e2bf10af445fdd92124a5eae1ad"}}, 1}, {<<"dmt_client">>, {git,"git@github.com:rbkmoney/dmt_client.git", - {ref,"9a1df31957451d5a12d26241a853ec5c945b1b7f"}}, + {ref,"ab6b7ffd174232576a646dafbbbb20257d20c22f"}}, 0}, {<<"genlib">>, {git,"https://github.com/rbkmoney/genlib.git",