diff --git a/apps/hellgate/src/hg_event_sink.erl b/apps/hellgate/src/hg_event_sink.erl deleted file mode 100644 index 8ff5947..0000000 --- a/apps/hellgate/src/hg_event_sink.erl +++ /dev/null @@ -1,67 +0,0 @@ --module(hg_event_sink). - --export([get_events/3]). --export([get_last_event_id/1]). - --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --type sink_event() :: { - hg_machine:event_id(), - hg_machine:ns(), - hg_machine:id(), - hg_machine:event() -}. - --export_type([sink_event/0]). - -%% Internal types - --type event_sink_id() :: dmsl_base_thrift:'ID'(). --type event_id() :: dmsl_base_thrift:'EventID'(). - -%% API - --spec get_events(event_sink_id(), event_id(), integer()) -> {ok, [sink_event()]} | {error, event_not_found}. -get_events(EventSinkID, After, Limit) -> - try - {ok, get_history_range(EventSinkID, After, Limit)} - catch - {exception, #mg_stateproc_EventNotFound{}} -> - {error, event_not_found} - end. - --spec get_last_event_id(event_sink_id()) -> {ok, event_id()} | {error, no_last_event}. -get_last_event_id(EventSinkID) -> - case get_history_range(EventSinkID, undefined, 1, backward) of - [{ID, _, _, _}] -> - {ok, ID}; - [] -> - {error, no_last_event} - end. - -get_history_range(EventSinkID, After, Limit) -> - get_history_range(EventSinkID, After, Limit, forward). - -get_history_range(EventSinkID, After, Limit, Direction) -> - HistoryRange = #mg_stateproc_HistoryRange{'after' = After, limit = Limit, direction = Direction}, - {ok, History} = call_event_sink('GetHistory', {EventSinkID, HistoryRange}), - map_sink_events(History). - -call_event_sink(Function, Args) -> - hg_woody_wrapper:call(eventsink, Function, Args). - -map_sink_events(History) -> - [map_sink_event(Ev) || Ev <- History]. - -map_sink_event(#mg_stateproc_SinkEvent{id = ID, source_ns = Ns, source_id = SourceID, event = Event}) -> - #mg_stateproc_Event{ - id = EventID, - created_at = Dt, - format_version = FormatVer, - data = Payload - } = Event, - DecodedPayload = #{ - format_version => FormatVer, - data => mg_msgpack_marshalling:unmarshal(Payload) - }, - {ID, Ns, SourceID, {EventID, Dt, DecodedPayload}}. diff --git a/apps/hellgate/src/hg_recurrent_paytool.erl b/apps/hellgate/src/hg_recurrent_paytool.erl index 05decc5..c0a19ea 100644 --- a/apps/hellgate/src/hg_recurrent_paytool.erl +++ b/apps/hellgate/src/hg_recurrent_paytool.erl @@ -84,20 +84,6 @@ %% Woody handler -spec handle_function(woody:func(), woody:args(), hg_woody_service_wrapper:handler_opts()) -> term() | no_return(). -handle_function('GetEvents', {#payproc_EventRange{'after' = After, limit = Limit}}, _Opts) -> - case hg_event_sink:get_events(?NS, After, Limit) of - {ok, Events} -> - publish_rec_payment_tool_events(Events); - {error, event_not_found} -> - throw(#payproc_EventNotFound{}) - end; -handle_function('GetLastEventID', {}, _Opts) -> - case hg_event_sink:get_last_event_id(?NS) of - {ok, ID} -> - ID; - {error, no_last_event} -> - throw(#payproc_NoLastEvent{}) - end; handle_function(Func, Args, Opts) -> scoper:scope( recurrent_payment_tools, @@ -960,23 +946,3 @@ unmarshal_event_payload(#{format_version := 1, data := {bin, Bin}}) -> Type = {struct, struct, {dmsl_payproc_thrift, 'RecurrentPaymentToolEventData'}}, #payproc_RecurrentPaymentToolEventData{changes = Changes} = hg_proto_utils:deserialize(Type, Bin), Changes. - -%% -%% Event sink -%% - -publish_rec_payment_tool_events(Events) -> - [publish_rec_payment_tool_event(Event) || Event <- Events]. - -publish_rec_payment_tool_event({ID, _Ns, SourceID, {EventID, Dt, Payload}}) -> - publish_rec_payment_tool_event(ID, SourceID, {EventID, Dt, Payload}). - -publish_rec_payment_tool_event(EventID, MachineID, {ID, Dt, Ev}) -> - Payload = unmarshal_event_payload(Ev), - #payproc_RecurrentPaymentToolEvent{ - id = EventID, - source = MachineID, - created_at = Dt, - payload = Payload, - sequence = ID - }. diff --git a/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl b/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl index 7bb5fe9..2aa33ba 100644 --- a/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl +++ b/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl @@ -28,7 +28,6 @@ -export([recurrent_paytool_abandoned/1]). -export([recurrent_paytool_acquirement_failed/1]). -export([recurrent_paytool_acquired/1]). --export([recurrent_paytool_event_sink/1]). -export([recurrent_paytool_cost/1]). -export([recurrent_paytool_w_tds_acquired/1]). @@ -105,7 +104,6 @@ all() -> get_recurrent_paytool, recurrent_paytool_acquirement_failed, recurrent_paytool_acquired, - recurrent_paytool_event_sink, recurrent_paytool_cost, recurrent_paytool_w_tds_acquired, recurrent_paytool_abandoned, @@ -228,7 +226,6 @@ invalid_payment_method(C, BCardFun) -> -spec get_recurrent_paytool(config()) -> test_case_result(). -spec recurrent_paytool_acquirement_failed(config()) -> test_case_result(). -spec recurrent_paytool_acquired(config()) -> test_case_result(). --spec recurrent_paytool_event_sink(config()) -> test_case_result(). -spec recurrent_paytool_cost(config()) -> test_case_result(). -spec recurrent_paytool_w_tds_acquired(config()) -> test_case_result(). -spec recurrent_paytool_abandoned(config()) -> test_case_result(). @@ -279,33 +276,6 @@ recurrent_paytool_acquired(C) -> #payproc_RecurrentPaymentTool{id = RecurrentPaytoolID} = RecurrentPaytool, ok = await_acquirement(RecurrentPaytoolID, Client). -recurrent_paytool_event_sink(C) -> - Client = cfg(client, C), - PaytoolID = hg_utils:unique_id(), - PartyID = cfg(party_id, C), - ShopID = cfg(shop_id, C), - Params = make_recurrent_paytool_params(PaytoolID, PartyID, ShopID, ?pmt_sys(<<"visa-ref">>)), - CreateResult = hg_client_recurrent_paytool:create(Params, Client), - #payproc_RecurrentPaymentTool{id = RecurrentPaytoolID} = CreateResult, - ok = await_acquirement(RecurrentPaytoolID, Client), - AbandonResult = hg_client_recurrent_paytool:abandon(RecurrentPaytoolID, Client), - #payproc_RecurrentPaymentTool{status = {abandoned, _}} = AbandonResult, - [?recurrent_payment_tool_has_abandoned()] = next_event(RecurrentPaytoolID, Client), - Events = hg_client_recurrent_paytool:get_events(RecurrentPaytoolID, #payproc_EventRange{}, Client), - ESEvents = hg_client_recurrent_paytool:get_events(#payproc_EventRange{}, Client), - SourceESEvents = lists:filter( - fun(Event) -> - Event#payproc_RecurrentPaymentToolEvent.source =:= RecurrentPaytoolID - end, - ESEvents - ), - EventIDs = lists:map(fun(Event) -> Event#payproc_RecurrentPaymentToolEvent.id end, Events), - ESEventSequenceIDs = lists:map(fun(Event) -> Event#payproc_RecurrentPaymentToolEvent.sequence end, SourceESEvents), - ?assertEqual(EventIDs, ESEventSequenceIDs), - EventPayloads = lists:map(fun(Event) -> Event#payproc_RecurrentPaymentToolEvent.payload end, Events), - ESEventPayloads = lists:map(fun(Event) -> Event#payproc_RecurrentPaymentToolEvent.payload end, SourceESEvents), - ?assertEqual(EventPayloads, ESEventPayloads). - recurrent_paytool_cost(C) -> Client = cfg(client, C), PaytoolID = hg_utils:unique_id(), diff --git a/apps/hg_client/src/hg_client_recurrent_paytool.erl b/apps/hg_client/src/hg_client_recurrent_paytool.erl index ed96683..966dc52 100644 --- a/apps/hg_client/src/hg_client_recurrent_paytool.erl +++ b/apps/hg_client/src/hg_client_recurrent_paytool.erl @@ -13,9 +13,6 @@ -export([get_events/3]). -export([abandon/2]). --export([get_events/2]). --export([get_last_event_id/1]). - -export([pull_event/2]). -export([pull_event/3]). @@ -75,14 +72,6 @@ get_events(ID, Range, Client) -> abandon(ID, Client) -> map_result_error(gen_server:call(Client, {call, 'Abandon', [ID]})). --spec get_events(range(), pid()) -> recurrent_paytool() | woody_error:business_error(). -get_events(Range, Client) -> - map_result_error(gen_server:call(Client, {call, 'GetEvents', [Range]})). - --spec get_last_event_id(pid()) -> recurrent_paytool() | woody_error:business_error(). -get_last_event_id(Client) -> - map_result_error(gen_server:call(Client, {call, 'GetLastEventID', []})). - -define(DEFAULT_NEXT_EVENT_TIMEOUT, 5000). -spec pull_event(recurrent_paytool_id(), pid()) -> tuple() | timeout | woody_error:business_error(). @@ -121,12 +110,6 @@ init(ApiClient) -> {ok, #state{pollers = #{}, client = ApiClient}}. -spec handle_call(term(), callref(), state()) -> {reply, term(), state()} | {noreply, state()}. -handle_call({call, 'GetLastEventID' = Function, [] = Args}, _From, St = #state{client = Client}) -> - {Result, ClientNext} = hg_client_api:call(recurrent_paytool_eventsink, Function, Args, Client), - {reply, Result, St#state{client = ClientNext}}; -handle_call({call, 'GetEvents' = Function, [_Range] = Args}, _From, St = #state{client = Client}) -> - {Result, ClientNext} = hg_client_api:call(recurrent_paytool_eventsink, Function, Args, Client), - {reply, Result, St#state{client = ClientNext}}; handle_call({call, Function, Args}, _From, St = #state{client = Client}) -> {Result, ClientNext} = hg_client_api:call(?SERVICE, Function, Args, Client), {reply, Result, St#state{client = ClientNext}}; diff --git a/compose.tracing.yaml b/compose.tracing.yaml index c20b024..9894d88 100644 --- a/compose.tracing.yaml +++ b/compose.tracing.yaml @@ -16,6 +16,9 @@ services: party-management: environment: *otlp_enabled + machinegun: + environment: *otlp_enabled + testrunner: environment: <<: *otlp_enabled diff --git a/compose.yaml b/compose.yaml index 78aed57..f1020ed 100644 --- a/compose.yaml +++ b/compose.yaml @@ -39,7 +39,7 @@ services: retries: 10 machinegun: - image: ghcr.io/valitydev/machinegun:sha-5c0db56 + image: ghcr.io/valitydev/mg2:sha-436f723 command: /opt/machinegun/bin/machinegun foreground volumes: - ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml diff --git a/rebar.lock b/rebar.lock index 47d9f3e..aa5cc9b 100644 --- a/rebar.lock +++ b/rebar.lock @@ -33,7 +33,7 @@ 1}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health.git", - {ref,"7ffbc855bdbe79e23efad1803b0b185c9ea8d2f1"}}, + {ref,"49716470d0e8dab5e37db55d52dea78001735a3d"}}, 0}, {<<"fault_detector_proto">>, {git,"https://github.com/valitydev/fault-detector-proto.git", @@ -56,7 +56,7 @@ {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, {git,"https://github.com/valitydev/machinegun-proto.git", - {ref,"96f7f11b184c29d8b7e83cd7646f3f2c13662bda"}}, + {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}}, 0}, {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.3.0">>},2}, {<<"msgpack_proto">>, diff --git a/test/machinegun/config.yaml b/test/machinegun/config.yaml index bbc8f69..3a55dce 100644 --- a/test/machinegun/config.yaml +++ b/test/machinegun/config.yaml @@ -3,42 +3,22 @@ erlang: secret_cookie_file: "/opt/machinegun/etc/cookie" namespaces: invoice: - event_sinks: - machine: - type: machine - machine_id: payproc processor: url: http://hellgate:8022/v1/stateproc/invoice pool_size: 300 invoice_template: - event_sinks: - machine: - type: machine - machine_id: payproc processor: url: http://hellgate:8022/v1/stateproc/invoice_template pool_size: 300 customer: - event_sinks: - machine: - type: machine - machine_id: payproc processor: url: http://hellgate:8022/v1/stateproc/customer pool_size: 300 recurrent_paytools: - event_sinks: - machine: - type: machine - machine_id: recurrent_paytools processor: url: http://hellgate:8022/v1/stateproc/recurrent_paytools pool_size: 300 party: - event_sinks: - machine: - type: machine - machine_id: payproc processor: url: http://party-management:8022/v1/stateproc/party pool_size: 300 @@ -72,9 +52,3 @@ woody_server: logging: out_type: stdout level: info - -opentelemetry: - service_name: machinegun - exporter: - protocol: http/protobuf - endpoint: http://jaeger:4318