diff --git a/.github/workflows/erlang-checks.yml b/.github/workflows/erlang-checks.yml index 53ea743..bd60cd6 100644 --- a/.github/workflows/erlang-checks.yml +++ b/.github/workflows/erlang-checks.yml @@ -29,7 +29,7 @@ jobs: run: name: Run checks needs: setup - uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.10 + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.14 with: otp-version: ${{ needs.setup.outputs.otp-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }} @@ -37,3 +37,4 @@ jobs: thrift-version: ${{ needs.setup.outputs.thrift-version }} run-ct-with-compose: true cache-version: v100 + upload-coverage: false diff --git a/apps/ff_cth/src/ct_eventsink.erl b/apps/ff_cth/src/ct_eventsink.erl deleted file mode 100644 index f3c3768..0000000 --- a/apps/ff_cth/src/ct_eventsink.erl +++ /dev/null @@ -1,94 +0,0 @@ --module(ct_eventsink). - --include_lib("fistful_proto/include/fistful_identity_thrift.hrl"). --include_lib("fistful_proto/include/fistful_wallet_thrift.hrl"). --include_lib("fistful_proto/include/fistful_wthd_thrift.hrl"). --include_lib("fistful_proto/include/fistful_wthd_session_thrift.hrl"). --include_lib("fistful_proto/include/fistful_destination_thrift.hrl"). --include_lib("fistful_proto/include/fistful_source_thrift.hrl"). --include_lib("fistful_proto/include/fistful_deposit_thrift.hrl"). --include_lib("fistful_proto/include/fistful_w2w_transfer_thrift.hrl"). --include_lib("fistful_proto/include/fistful_evsink_thrift.hrl"). - --type sink() :: - ff_services:service_name(). - --type event() :: - fistful_wallet_thrift:'SinkEvent'() - | fistful_wthd_thrift:'SinkEvent'() - | fistful_identity_thrift:'SinkEvent'() - | fistful_destination_thrift:'SinkEvent'() - | fistful_source_thrift:'SinkEvent'() - | fistful_deposit_thrift:'SinkEvent'() - | fistful_wthd_thrift:'SinkEvent'() - | fistful_w2w_transfer_thrift:'SinkEvent'(). - --type event_id() :: fistful_evsink_thrift:'EventID'(). --type limit() :: non_neg_integer(). - --export([last_id/1]). - --export([events/3]). --export([consume/2]). --export([fold/4]). - --export([get_max_event_id/1]). - -%% - --spec last_id(sink()) -> event_id() | 0. -last_id(Sink) -> - case call_handler('GetLastEventID', Sink, {}) of - {ok, EventID} -> - EventID; - {exception, #'evsink_NoLastEvent'{}} -> - 0 - end. - --spec events(_After :: event_id() | undefined, limit(), sink()) -> {[event()], _Last :: event_id()}. -events(After, Limit, Sink) -> - Range = #'evsink_EventRange'{'after' = After, limit = Limit}, - {ok, Events} = call_handler('GetEvents', Sink, {Range}), - {Events, get_max_event_id(Events)}. - --spec consume(_ChunkSize :: limit(), sink()) -> [event()]. -consume(ChunkSize, Sink) -> - fold(fun(Chunk, Acc) -> Chunk ++ Acc end, [], ChunkSize, Sink). - --spec fold(fun(([event()], State) -> State), State, _ChunkSize :: limit(), sink()) -> State. -fold(FoldFun, InitialState, ChunkSize, Sink) -> - fold(FoldFun, InitialState, ChunkSize, Sink, undefined). - -fold(FoldFun, State0, ChunkSize, Sink, Cursor) -> - {Events, LastID} = events(Cursor, ChunkSize, Sink), - State1 = FoldFun(Events, State0), - case length(Events) of - N when N >= ChunkSize -> - fold(FoldFun, State1, ChunkSize, Sink, LastID); - _ -> - State1 - end. - --spec get_max_event_id([event()]) -> event_id(). -get_max_event_id(Events) when is_list(Events) -> - lists:foldl(fun(Ev, Max) -> erlang:max(get_event_id(Ev), Max) end, 0, Events). - --spec get_event_id(event()) -> event_id(). -get_event_id(#'wallet_SinkEvent'{id = ID}) -> ID; -get_event_id(#'wthd_SinkEvent'{id = ID}) -> ID; -get_event_id(#'identity_SinkEvent'{id = ID}) -> ID; -get_event_id(#'destination_SinkEvent'{id = ID}) -> ID; -get_event_id(#'source_SinkEvent'{id = ID}) -> ID; -get_event_id(#'deposit_SinkEvent'{id = ID}) -> ID; -get_event_id(#'wthd_session_SinkEvent'{id = ID}) -> ID; -get_event_id(#'w2w_transfer_SinkEvent'{id = ID}) -> ID. - -call_handler(Function, ServiceName, Args) -> - Service = ff_services:get_service(ServiceName), - Path = erlang:list_to_binary(ff_services:get_service_path(ServiceName)), - Request = {Service, Function, Args}, - Client = ff_woody_client:new(#{ - url => <<"http://localhost:8022", Path/binary>>, - event_handler => ff_woody_event_handler - }), - ff_woody_client:call(Client, Request). diff --git a/apps/ff_cth/src/ct_helper.erl b/apps/ff_cth/src/ct_helper.erl index 23d8efd..68cc004 100644 --- a/apps/ff_cth/src/ct_helper.erl +++ b/apps/ff_cth/src/ct_helper.erl @@ -124,32 +124,6 @@ start_app(ff_server = AppName) -> {port, 8022}, {admin, #{ path => <<"/v1/admin">> - }}, - {eventsink, #{ - identity => #{ - namespace => 'ff/identity' - }, - wallet => #{ - namespace => 'ff/wallet_v2' - }, - withdrawal => #{ - namespace => 'ff/withdrawal_v2' - }, - deposit => #{ - namespace => 'ff/deposit_v1' - }, - destination => #{ - namespace => 'ff/destination_v2' - }, - source => #{ - namespace => 'ff/source_v1' - }, - withdrawal_session => #{ - namespace => 'ff/withdrawal/session_v2' - }, - w2w_transfer => #{ - namespace => 'ff/w2w_transfer_v1' - } }} ]), #{} diff --git a/apps/ff_cth/src/ct_payment_system.erl b/apps/ff_cth/src/ct_payment_system.erl index b86a1c2..34fca05 100644 --- a/apps/ff_cth/src/ct_payment_system.erl +++ b/apps/ff_cth/src/ct_payment_system.erl @@ -238,7 +238,6 @@ identity_provider_config(Options) -> services(Options) -> Default = #{ ff_withdrawal_adapter_host => "http://fistful-server:8022/v1/ff_withdrawal_adapter_host", - eventsink => "http://machinegun:8022/v1/event_sink", automaton => "http://machinegun:8022/v1/automaton", accounter => "http://shumway:8022/accounter", partymgmt => "http://party-management:8022/v1/processing/partymgmt", diff --git a/apps/ff_server/src/ff_deposit_eventsink_publisher.erl b/apps/ff_server/src/ff_deposit_eventsink_publisher.erl deleted file mode 100644 index 2947126..0000000 --- a/apps/ff_server/src/ff_deposit_eventsink_publisher.erl +++ /dev/null @@ -1,47 +0,0 @@ --module(ff_deposit_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_deposit_thrift.hrl"). --include_lib("fistful_proto/include/fistful_cashflow_thrift.hrl"). --include_lib("fistful_proto/include/fistful_fistful_base_thrift.hrl"). --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_deposit:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_deposit_thrift:'SinkEvent'()). - -%% -%% Internals -%% - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #deposit_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #deposit_EventSinkPayload{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% - -marshal(Type, Value) -> - ff_deposit_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_destination_eventsink_publisher.erl b/apps/ff_server/src/ff_destination_eventsink_publisher.erl deleted file mode 100644 index a168250..0000000 --- a/apps/ff_server/src/ff_destination_eventsink_publisher.erl +++ /dev/null @@ -1,42 +0,0 @@ --module(ff_destination_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_destination_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_destination:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_destination_thrift:'SinkEvent'()). - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #destination_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #destination_EventSinkPayload{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% -%% Internals -%% - -marshal(Type, Value) -> - ff_destination_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_eventsink_handler.erl b/apps/ff_server/src/ff_eventsink_handler.erl deleted file mode 100644 index 1c7d135..0000000 --- a/apps/ff_server/src/ff_eventsink_handler.erl +++ /dev/null @@ -1,55 +0,0 @@ --module(ff_eventsink_handler). - --behaviour(ff_woody_wrapper). - --export([handle_function/3]). - --include_lib("fistful_proto/include/fistful_evsink_thrift.hrl"). - --type options() :: #{ - schema := module(), - client := woody_client:options(), - ns := binary(), - publisher := module() -}. - -%% -%% ff_woody_wrapper callbacks -%% - --spec handle_function(woody:func(), woody:args(), options()) -> {ok, woody:result()} | no_return(). -handle_function(Func, Args, Opts) -> - scoper:scope( - eventsink_handler, - #{}, - fun() -> - handle_function_(Func, Args, Opts) - end - ). - -handle_function_('GetEvents', {#'evsink_EventRange'{'after' = After0, limit = Limit}}, Options) -> - #{ - schema := Schema, - client := Client, - ns := NS, - publisher := Publisher, - start_event := StartEvent - } = Options, - After = erlang:max(After0, StartEvent), - WoodyContext = ff_context:get_woody_context(ff_context:load()), - {ok, Events} = machinery_mg_eventsink:get_events( - NS, - After, - Limit, - #{client => {Client, WoodyContext}, schema => Schema} - ), - ff_eventsink_publisher:publish_events(Events, #{publisher => Publisher}); -handle_function_('GetLastEventID', _Params, #{schema := Schema, client := Client, ns := NS}) -> - WoodyContext = ff_context:get_woody_context(ff_context:load()), - Opts = #{client => {Client, WoodyContext}, schema => Schema}, - case machinery_mg_eventsink:get_last_event_id(NS, Opts) of - {ok, _} = Result -> - Result; - {error, no_last_event} -> - woody_error:raise(business, #'evsink_NoLastEvent'{}) - end. diff --git a/apps/ff_server/src/ff_eventsink_publisher.erl b/apps/ff_server/src/ff_eventsink_publisher.erl deleted file mode 100644 index 6143a38..0000000 --- a/apps/ff_server/src/ff_eventsink_publisher.erl +++ /dev/null @@ -1,40 +0,0 @@ -%%% -%%% Publisher - he comes to publish all eventsinks -%%% - --module(ff_eventsink_publisher). - -%% API - --type event(T) :: - machinery_mg_eventsink:evsink_event( - ff_machine:timestamped_event(T) - ). - --type sinkevent(T) :: T. --type options() :: #{publisher := module()}. - -%% Behaviour definition - --export_type([event/1]). --export_type([sinkevent/1]). --export_type([options/0]). - --callback publish_events(list(event(_))) -> list(sinkevent(_)). - -%% API - --export([publish_events/2]). - --spec publish_events(list(event(_)), options()) -> {ok, list(sinkevent(_))}. -publish_events(Events, Opts) -> - {ok, handler_publish_events(Events, Opts)}. - -get_publicher(#{publisher := Publisher}) -> - Publisher. - -%% Publisher calls - -handler_publish_events(Events, Opts) -> - Publisher = get_publicher(Opts), - Publisher:publish_events(Events). diff --git a/apps/ff_server/src/ff_identity_eventsink_publisher.erl b/apps/ff_server/src/ff_identity_eventsink_publisher.erl deleted file mode 100644 index 4d278fe..0000000 --- a/apps/ff_server/src/ff_identity_eventsink_publisher.erl +++ /dev/null @@ -1,42 +0,0 @@ --module(ff_identity_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_identity_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_identity:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_identity_thrift:'SinkEvent'()). - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #identity_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #identity_EventSinkPayload{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% -%% Internals -%% - -marshal(Type, Value) -> - ff_identity_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_server.erl b/apps/ff_server/src/ff_server.erl index 7edbdba..ea7f11f 100644 --- a/apps/ff_server/src/ff_server.erl +++ b/apps/ff_server/src/ff_server.erl @@ -98,7 +98,7 @@ init([]) -> {w2w_transfer_management, ff_w2w_transfer_handler}, {w2w_transfer_repairer, ff_w2w_transfer_repair}, {ff_claim_committer, ff_claim_committer_handler} - ] ++ get_eventsink_handlers(), + ], WoodyHandlers = [get_handler(Service, Handler, WrapperOpts) || {Service, Handler} <- Services], ServicesChildSpec = woody_server:child_spec( @@ -172,40 +172,6 @@ get_service_client(ServiceID) -> error({unknown_service, ServiceID}) end. -get_eventsink_handlers() -> - Client = get_service_client(eventsink), - Cfg = #{ - client => Client - }, - Publishers = [ - {deposit, deposit_event_sink, ff_deposit_eventsink_publisher}, - {source, source_event_sink, ff_source_eventsink_publisher}, - {destination, destination_event_sink, ff_destination_eventsink_publisher}, - {identity, identity_event_sink, ff_identity_eventsink_publisher}, - {wallet, wallet_event_sink, ff_wallet_eventsink_publisher}, - {withdrawal, withdrawal_event_sink, ff_withdrawal_eventsink_publisher}, - {withdrawal_session, withdrawal_session_event_sink, ff_withdrawal_session_eventsink_publisher}, - {w2w_transfer, w2w_transfer_event_sink, ff_w2w_transfer_eventsink_publisher} - ], - [get_eventsink_handler(Name, Service, Publisher, Cfg) || {Name, Service, Publisher} <- Publishers]. - -get_eventsink_handler(Name, Service, Publisher, Config) -> - Sinks = genlib_app:env(?MODULE, eventsink, #{}), - case maps:find(Name, Sinks) of - {ok, Opts} -> - NS = maps:get(namespace, Opts), - StartEvent = maps:get(start_event, Opts, 0), - FullConfig = Config#{ - ns => erlang:atom_to_binary(NS, utf8), - publisher => Publisher, - start_event => StartEvent, - schema => get_namespace_schema(NS) - }, - {Service, {ff_eventsink_handler, FullConfig}}; - error -> - erlang:error({unknown_eventsink, Name, Sinks}) - end. - get_namespace_schema('ff/identity') -> ff_identity_machinery_schema; get_namespace_schema('ff/wallet_v2') -> diff --git a/apps/ff_server/src/ff_services.erl b/apps/ff_server/src/ff_services.erl index c0e3abf..32a4374 100644 --- a/apps/ff_server/src/ff_services.erl +++ b/apps/ff_server/src/ff_services.erl @@ -21,20 +21,6 @@ get_service(fistful_provider) -> {fistful_provider_thrift, 'Management'}; get_service(ff_withdrawal_adapter_host) -> {dmsl_wthd_provider_thrift, 'AdapterHost'}; -get_service(deposit_event_sink) -> - {fistful_deposit_thrift, 'EventSink'}; -get_service(source_event_sink) -> - {fistful_source_thrift, 'EventSink'}; -get_service(destination_event_sink) -> - {fistful_destination_thrift, 'EventSink'}; -get_service(identity_event_sink) -> - {fistful_identity_thrift, 'EventSink'}; -get_service(wallet_event_sink) -> - {fistful_wallet_thrift, 'EventSink'}; -get_service(withdrawal_event_sink) -> - {fistful_wthd_thrift, 'EventSink'}; -get_service(withdrawal_session_event_sink) -> - {fistful_wthd_session_thrift, 'EventSink'}; get_service(withdrawal_session_repairer) -> {fistful_wthd_session_thrift, 'Repairer'}; get_service(withdrawal_repairer) -> @@ -55,8 +41,6 @@ get_service(withdrawal_session_management) -> {fistful_wthd_session_thrift, 'Management'}; get_service(deposit_management) -> {fistful_deposit_thrift, 'Management'}; -get_service(w2w_transfer_event_sink) -> - {fistful_w2w_transfer_thrift, 'EventSink'}; get_service(w2w_transfer_repairer) -> {fistful_w2w_transfer_thrift, 'Repairer'}; get_service(w2w_transfer_management) -> @@ -75,20 +59,6 @@ get_service_path(fistful_provider) -> "/v1/provider"; get_service_path(ff_withdrawal_adapter_host) -> "/v1/ff_withdrawal_adapter_host"; -get_service_path(deposit_event_sink) -> - "/v1/eventsink/deposit"; -get_service_path(source_event_sink) -> - "/v1/eventsink/source"; -get_service_path(destination_event_sink) -> - "/v1/eventsink/destination"; -get_service_path(identity_event_sink) -> - "/v1/eventsink/identity"; -get_service_path(wallet_event_sink) -> - "/v1/eventsink/wallet"; -get_service_path(withdrawal_event_sink) -> - "/v1/eventsink/withdrawal"; -get_service_path(withdrawal_session_event_sink) -> - "/v1/eventsink/withdrawal/session"; get_service_path(withdrawal_session_repairer) -> "/v1/repair/withdrawal/session"; get_service_path(withdrawal_repairer) -> @@ -109,8 +79,6 @@ get_service_path(withdrawal_session_management) -> "/v1/withdrawal_session"; get_service_path(deposit_management) -> "/v1/deposit"; -get_service_path(w2w_transfer_event_sink) -> - "/v1/eventsink/w2w_transfer"; get_service_path(w2w_transfer_repairer) -> "/v1/repair/w2w_transfer"; get_service_path(w2w_transfer_management) -> diff --git a/apps/ff_server/src/ff_source_eventsink_publisher.erl b/apps/ff_server/src/ff_source_eventsink_publisher.erl deleted file mode 100644 index 028dbd0..0000000 --- a/apps/ff_server/src/ff_source_eventsink_publisher.erl +++ /dev/null @@ -1,42 +0,0 @@ --module(ff_source_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_source_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_source:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_source_thrift:'SinkEvent'()). - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #source_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #source_EventSinkPayload{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% -%% Internals -%% - -marshal(Type, Value) -> - ff_source_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_w2w_transfer_eventsink_publisher.erl b/apps/ff_server/src/ff_w2w_transfer_eventsink_publisher.erl deleted file mode 100644 index 4ec92e3..0000000 --- a/apps/ff_server/src/ff_w2w_transfer_eventsink_publisher.erl +++ /dev/null @@ -1,47 +0,0 @@ --module(ff_w2w_transfer_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_w2w_transfer_thrift.hrl"). --include_lib("fistful_proto/include/fistful_cashflow_thrift.hrl"). --include_lib("fistful_proto/include/fistful_fistful_base_thrift.hrl"). --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(w2w_transfer:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_w2w_transfer_thrift:'SinkEvent'()). - -%% -%% Internals -%% - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #w2w_transfer_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #w2w_transfer_EventSinkPayload{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% - -marshal(Type, Value) -> - ff_w2w_transfer_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_wallet_eventsink_publisher.erl b/apps/ff_server/src/ff_wallet_eventsink_publisher.erl deleted file mode 100644 index a1d3e44..0000000 --- a/apps/ff_server/src/ff_wallet_eventsink_publisher.erl +++ /dev/null @@ -1,42 +0,0 @@ --module(ff_wallet_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_wallet_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_wallet:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_wallet_thrift:'SinkEvent'()). - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #wallet_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #wallet_Event{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% -%% Internals -%% - -marshal(Type, Value) -> - ff_wallet_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_withdrawal_eventsink_publisher.erl b/apps/ff_server/src/ff_withdrawal_eventsink_publisher.erl deleted file mode 100644 index b4b90af..0000000 --- a/apps/ff_server/src/ff_withdrawal_eventsink_publisher.erl +++ /dev/null @@ -1,47 +0,0 @@ --module(ff_withdrawal_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_wthd_thrift.hrl"). --include_lib("fistful_proto/include/fistful_cashflow_thrift.hrl"). --include_lib("fistful_proto/include/fistful_fistful_base_thrift.hrl"). --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_withdrawal:event()). --type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_wthd_thrift:'SinkEvent'()). - -%% -%% Internals -%% - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #wthd_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #wthd_EventSinkPayload{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% Internals - -marshal(Type, Value) -> - ff_withdrawal_codec:marshal(Type, Value). diff --git a/apps/ff_server/src/ff_withdrawal_session_eventsink_publisher.erl b/apps/ff_server/src/ff_withdrawal_session_eventsink_publisher.erl deleted file mode 100644 index 12194bc..0000000 --- a/apps/ff_server/src/ff_withdrawal_session_eventsink_publisher.erl +++ /dev/null @@ -1,48 +0,0 @@ --module(ff_withdrawal_session_eventsink_publisher). - --behaviour(ff_eventsink_publisher). - --export([publish_events/1]). - --include_lib("fistful_proto/include/fistful_wthd_session_thrift.hrl"). --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --type event() :: ff_eventsink_publisher:event(ff_withdrawal_session:event()). --type sinkevent() :: - ff_eventsink_publisher:sinkevent( - fistful_wthd_session_thrift:'SinkEvent'() - ). - -%% -%% Internals -%% - --spec publish_events(list(event())) -> list(sinkevent()). -publish_events(Events) -> - [publish_event(Event) || Event <- Events]. - --spec publish_event(event()) -> sinkevent(). -publish_event(#{ - id := ID, - source_id := SourceID, - event := { - EventID, - Dt, - {ev, EventDt, Payload} - } -}) -> - #wthd_session_SinkEvent{ - id = marshal(event_id, ID), - created_at = marshal(timestamp, Dt), - source = marshal(id, SourceID), - payload = #wthd_session_Event{ - sequence = marshal(event_id, EventID), - occured_at = marshal(timestamp, EventDt), - changes = [marshal(change, Payload)] - } - }. - -%% Internals - -marshal(Type, Value) -> - ff_withdrawal_session_codec:marshal(Type, Value). diff --git a/apps/ff_server/test/ff_eventsink_SUITE.erl b/apps/ff_server/test/ff_eventsink_SUITE.erl deleted file mode 100644 index 88d1748..0000000 --- a/apps/ff_server/test/ff_eventsink_SUITE.erl +++ /dev/null @@ -1,514 +0,0 @@ --module(ff_eventsink_SUITE). - --include_lib("fistful_proto/include/fistful_wthd_thrift.hrl"). --include_lib("fistful_proto/include/fistful_evsink_thrift.hrl"). --include_lib("fistful_proto/include/fistful_transfer_thrift.hrl"). - --export([all/0]). --export([groups/0]). --export([init_per_suite/1]). --export([end_per_suite/1]). --export([init_per_group/2]). --export([end_per_group/2]). --export([init_per_testcase/2]). --export([end_per_testcase/2]). - --export([get_identity_events_ok/1]). --export([get_create_wallet_events_ok/1]). --export([get_withdrawal_events_ok/1]). --export([get_withdrawal_session_events_ok/1]). --export([get_create_destination_events_ok/1]). --export([get_create_source_events_ok/1]). --export([get_create_deposit_events_ok/1]). --export([get_shifted_create_identity_events_ok/1]). --export([get_create_w2w_transfer_events_ok/1]). - --type config() :: ct_helper:config(). --type test_case_name() :: ct_helper:test_case_name(). --type group_name() :: ct_helper:group_name(). --type test_return() :: _ | no_return(). - --spec all() -> [test_case_name()]. -all() -> - [ - get_identity_events_ok, - get_create_wallet_events_ok, - get_withdrawal_events_ok, - get_create_destination_events_ok, - get_create_source_events_ok, - get_create_deposit_events_ok, - get_withdrawal_session_events_ok, - get_shifted_create_identity_events_ok, - get_create_w2w_transfer_events_ok - ]. - --spec groups() -> []. -groups() -> []. - --spec init_per_suite(config()) -> config(). -init_per_suite(C) -> - ct_helper:makeup_cfg( - [ - ct_helper:test_case_name(init), - ct_payment_system:setup() - ], - C - ). - --spec end_per_suite(config()) -> _. -end_per_suite(C) -> - ok = ct_payment_system:shutdown(C). - -%% - --spec init_per_group(group_name(), config()) -> config(). -init_per_group(_, C) -> - C. - --spec end_per_group(group_name(), config()) -> _. -end_per_group(_, _) -> - ok. - -%% - --spec init_per_testcase(test_case_name(), config()) -> config(). -init_per_testcase(Name, C) -> - C1 = ct_helper:makeup_cfg([ct_helper:test_case_name(Name), ct_helper:woody_ctx()], C), - ok = ct_helper:set_context(C1), - C1. - --spec end_per_testcase(test_case_name(), config()) -> _. -end_per_testcase(_Name, _C) -> - ok = ct_helper:unset_context(). - -%% - --spec get_identity_events_ok(config()) -> test_return(). -get_identity_events_ok(C) -> - ID = genlib:unique(), - Party = create_party(C), - Name = <<"Identity Name">>, - Sink = identity_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - ok = ff_identity_machine:create( - #{ - id => ID, - name => Name, - party => Party, - provider => <<"good-one">> - }, - #{<<"com.valitydev.wapi">> => #{<<"name">> => Name}} - ), - - {ok, RawEvents} = ff_identity_machine:events(ID, {undefined, 1000}), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - --spec get_create_wallet_events_ok(config()) -> test_return(). -get_create_wallet_events_ok(C) -> - ID = genlib:unique(), - Party = create_party(C), - IdentityID = create_identity(Party, C), - - Sink = wallet_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - ok = ff_wallet_machine:create( - #{ - id => ID, - identity => IdentityID, - name => <<"EVENTS TEST">>, - currency => <<"RUB">> - }, - ff_entity_context:new() - ), - {ok, RawEvents} = ff_wallet_machine:events(ID, {undefined, 1000}), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - --spec get_withdrawal_events_ok(config()) -> test_return(). -get_withdrawal_events_ok(C) -> - Sink = withdrawal_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - Party = create_party(C), - IID = create_identity(Party, C), - WalID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C), - SrcID = create_source(IID, C), - _DepID = process_deposit(SrcID, WalID), - DestID = create_destination(IID, C), - WdrID = process_withdrawal(WalID, DestID), - - {Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - {ok, RawEvents} = ff_withdrawal_machine:events(WdrID, {undefined, 1000}), - - AlienEvents = lists:filter( - fun(Ev) -> - Ev#wthd_SinkEvent.source =/= WdrID - end, - Events - ), - - MaxID = LastEvent + length(RawEvents) + length(AlienEvents). - --spec get_withdrawal_session_events_ok(config()) -> test_return(). -get_withdrawal_session_events_ok(C) -> - Sink = withdrawal_session_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - Party = create_party(C), - IID = create_identity(Party, C), - WalID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C), - SrcID = create_source(IID, C), - _DepID = process_deposit(SrcID, WalID), - DestID = create_destination(IID, C), - WdrID = process_withdrawal(WalID, DestID), - - {ok, St} = ff_withdrawal_machine:get(WdrID), - Withdrawal = ff_withdrawal_machine:withdrawal(St), - [#{id := SessID}] = ff_withdrawal:sessions(Withdrawal), - - {ok, RawEvents} = ff_withdrawal_session_machine:events( - SessID, - {undefined, 1000} - ), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - --spec get_create_destination_events_ok(config()) -> test_return(). -get_create_destination_events_ok(C) -> - Sink = destination_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - Party = create_party(C), - IID = create_identity(Party, C), - DestID = create_destination(IID, C), - - {ok, RawEvents} = ff_destination_machine:events(DestID, {undefined, 1000}), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - --spec get_create_source_events_ok(config()) -> test_return(). -get_create_source_events_ok(C) -> - Sink = source_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - Party = create_party(C), - IID = create_identity(Party, C), - SrcID = create_source(IID, C), - - {ok, RawEvents} = ff_source_machine:events(SrcID, {undefined, 1000}), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - --spec get_create_deposit_events_ok(config()) -> test_return(). -get_create_deposit_events_ok(C) -> - Sink = deposit_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - Party = create_party(C), - IID = create_identity(Party, C), - WalID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C), - SrcID = create_source(IID, C), - DepID = process_deposit(SrcID, WalID), - - {ok, RawEvents} = ff_deposit_machine:events(DepID, {undefined, 1000}), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - --spec get_shifted_create_identity_events_ok(config()) -> test_return(). -get_shifted_create_identity_events_ok(C) -> - #{suite_sup := SuiteSup} = ct_helper:cfg(payment_system, C), - Service = identity_event_sink, - StartEventNum = 3, - IdentityRoute = create_sink_route( - Service, - {ff_eventsink_handler, #{ - ns => <<"ff/identity">>, - publisher => ff_identity_eventsink_publisher, - start_event => StartEventNum, - schema => ff_identity_machinery_schema - }} - ), - {ok, _} = supervisor:start_child( - SuiteSup, - woody_server:child_spec( - ?MODULE, - #{ - ip => {0, 0, 0, 0}, - port => 8040, - handlers => [], - event_handler => ff_woody_event_handler, - additional_routes => IdentityRoute - } - ) - ), - {ok, Events} = call_route_handler('GetEvents', Service, {#'evsink_EventRange'{'after' = 0, limit = 1}}), - MaxID = ct_eventsink:get_max_event_id(Events), - MaxID = StartEventNum + 1. - --spec get_create_w2w_transfer_events_ok(config()) -> test_return(). -get_create_w2w_transfer_events_ok(C) -> - Sink = w2w_transfer_event_sink, - LastEvent = ct_eventsink:last_id(Sink), - - Party = create_party(C), - IID = create_identity(Party, C), - WalFromID = create_wallet(IID, <<"HAHA NO1">>, <<"RUB">>, C), - WalToID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C), - SrcID = create_source(IID, C), - _DepID = process_deposit(SrcID, WalFromID), - - ID = process_w2w(WalFromID, WalToID), - - {ok, RawEvents} = w2w_transfer_machine:events(ID, {undefined, 1000}), - {_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink), - MaxID = LastEvent + length(RawEvents). - -create_party(_C) -> - ID = genlib:bsuuid(), - _ = ff_party:create(ID), - ID. - -create_identity(Party, C) -> - create_identity(Party, <<"good-one">>, C). - -create_identity(Party, ProviderID, C) -> - create_identity(Party, <<"Identity Name">>, ProviderID, C). - -create_identity(Party, Name, ProviderID, _C) -> - ID = genlib:unique(), - ok = ff_identity_machine:create( - #{id => ID, name => Name, party => Party, provider => ProviderID}, - #{<<"com.valitydev.wapi">> => #{<<"name">> => Name}} - ), - ID. - -create_wallet(IdentityID, Name, Currency, _C) -> - ID = genlib:unique(), - ok = ff_wallet_machine:create( - #{id => ID, identity => IdentityID, name => Name, currency => Currency}, - ff_entity_context:new() - ), - ID. - -create_source(IdentityID, Name, Currency, Resource) -> - ID = genlib:unique(), - ok = ff_source_machine:create( - #{id => ID, identity => IdentityID, name => Name, currency => Currency, resource => Resource}, - ff_entity_context:new() - ), - ID. - -create_destination(IdentityID, Name, Currency, Resource) -> - ID = genlib:unique(), - ok = ff_destination_machine:create( - #{id => ID, identity => IdentityID, name => Name, currency => Currency, resource => Resource}, - ff_entity_context:new() - ), - ID. - -generate_id() -> - genlib:to_binary(genlib_time:ticks()). - -create_source(IID, _C) -> - SrcResource = #{type => internal, details => <<"Infinite source of cash">>}, - SrcID = create_source(IID, <<"XSource">>, <<"RUB">>, SrcResource), - authorized = ct_helper:await( - authorized, - fun() -> - {ok, SrcM} = ff_source_machine:get(SrcID), - Source = ff_source_machine:source(SrcM), - ff_source:status(Source) - end - ), - SrcID. - -process_deposit(SrcID, WalID) -> - DepID = generate_id(), - ok = ff_deposit_machine:create( - #{id => DepID, source_id => SrcID, wallet_id => WalID, body => {10000, <<"RUB">>}}, - ff_entity_context:new() - ), - succeeded = await_final_deposit_status(DepID), - DepID. - -create_destination(IID, C) -> - DestResource = {bank_card, #{bank_card => ct_cardstore:bank_card(<<"4150399999000900">>, {12, 2025}, C)}}, - DestID = create_destination(IID, <<"XDesination">>, <<"RUB">>, DestResource), - authorized = ct_helper:await( - authorized, - fun() -> - {ok, DestM} = ff_destination_machine:get(DestID), - Destination = ff_destination_machine:destination(DestM), - ff_destination:status(Destination) - end - ), - DestID. - -process_withdrawal(WalID, DestID) -> - WdrID = generate_id(), - - ok = ff_withdrawal_machine:create( - #{id => WdrID, wallet_id => WalID, destination_id => DestID, body => {4240, <<"RUB">>}}, - ff_entity_context:new() - ), - succeeded = await_final_withdrawal_status(WdrID), - true = ct_helper:await( - true, - fun() -> - Sink = withdrawal_event_sink, - {Events, _MaxID} = ct_eventsink:events(undefined, 1000, Sink), - search_event_commited(Events, WdrID) - end, - % genlib_retry:linear(15, 1000) - genlib_retry:linear(5, 1000) - ), - WdrID. - -process_w2w(WalletFromID, WalletToID) -> - ID = generate_id(), - ok = w2w_transfer_machine:create( - #{id => ID, wallet_from_id => WalletFromID, wallet_to_id => WalletToID, body => {10000, <<"RUB">>}}, - ff_entity_context:new() - ), - succeeded = await_final_w2w_transfer_status(ID), - ID. - -get_w2w_transfer(DepositID) -> - {ok, Machine} = w2w_transfer_machine:get(DepositID), - w2w_transfer_machine:w2w_transfer(Machine). - -get_w2w_transfer_status(DepositID) -> - w2w_transfer:status(get_w2w_transfer(DepositID)). - -await_final_w2w_transfer_status(DepositID) -> - finished = ct_helper:await( - finished, - fun() -> - {ok, Machine} = w2w_transfer_machine:get(DepositID), - Deposit = w2w_transfer_machine:w2w_transfer(Machine), - case w2w_transfer:is_finished(Deposit) of - false -> - {not_finished, Deposit}; - true -> - finished - end - end, - genlib_retry:linear(90, 1000) - ), - get_w2w_transfer_status(DepositID). - -get_deposit(DepositID) -> - {ok, Machine} = ff_deposit_machine:get(DepositID), - ff_deposit_machine:deposit(Machine). - -get_deposit_status(DepositID) -> - ff_deposit:status(get_deposit(DepositID)). - -await_final_deposit_status(DepositID) -> - finished = ct_helper:await( - finished, - fun() -> - {ok, Machine} = ff_deposit_machine:get(DepositID), - Deposit = ff_deposit_machine:deposit(Machine), - case ff_deposit:is_finished(Deposit) of - false -> - {not_finished, Deposit}; - true -> - finished - end - end, - genlib_retry:linear(90, 1000) - ), - get_deposit_status(DepositID). - -get_withdrawal(WithdrawalID) -> - {ok, Machine} = ff_withdrawal_machine:get(WithdrawalID), - ff_withdrawal_machine:withdrawal(Machine). - -get_withdrawal_status(WithdrawalID) -> - ff_withdrawal:status(get_withdrawal(WithdrawalID)). - -await_final_withdrawal_status(WithdrawalID) -> - finished = ct_helper:await( - finished, - fun() -> - {ok, Machine} = ff_withdrawal_machine:get(WithdrawalID), - Withdrawal = ff_withdrawal_machine:withdrawal(Machine), - case ff_withdrawal:is_finished(Withdrawal) of - false -> - {not_finished, Withdrawal}; - true -> - finished - end - end, - genlib_retry:linear(90, 1000) - ), - get_withdrawal_status(WithdrawalID). - -call_route_handler(Function, ServiceName, Args) -> - call_handler(Function, ServiceName, Args, <<"8040">>). - -call_handler(Function, ServiceName, Args, Port) -> - Service = ff_services:get_service(ServiceName), - Path = erlang:list_to_binary(ff_services:get_service_path(ServiceName)), - Request = {Service, Function, Args}, - Client = ff_woody_client:new(#{ - url => <<"http://localhost:", Port/binary, Path/binary>>, - event_handler => ff_woody_event_handler - }), - ff_woody_client:call(Client, Request). - -create_sink_route(ServiceName, {Handler, Cfg}) -> - Service = ff_services:get_service(ServiceName), - Path = ff_services:get_service_path(ServiceName), - NewCfg = Cfg#{ - client => #{ - event_handler => ff_woody_event_handler, - url => "http://machinegun:8022/v1/event_sink" - } - }, - PartyClient = party_client:create_client(), - WrapperOptions = #{ - handler => {Handler, NewCfg}, - party_client => PartyClient - }, - woody_server_thrift_http_handler:get_routes( - genlib_map:compact(#{ - handlers => [{Path, {Service, {ff_woody_wrapper, WrapperOptions}}}], - event_handler => ff_woody_event_handler - }) - ). - -search_event_commited(Events, WdrID) -> - ClearEv = lists:filter( - fun(Ev) -> - case Ev#wthd_SinkEvent.source of - WdrID -> true; - _ -> false - end - end, - Events - ), - - TransferCommited = lists:filter( - fun(Ev) -> - Payload = Ev#wthd_SinkEvent.payload, - Changes = Payload#wthd_EventSinkPayload.changes, - lists:any(fun is_commited_ev/1, Changes) - end, - ClearEv - ), - - length(TransferCommited) =/= 0. - -is_commited_ev({transfer, #wthd_TransferChange{payload = TransferEvent}}) -> - case TransferEvent of - {status_changed, #transfer_StatusChange{status = {committed, #transfer_Committed{}}}} -> - true; - _Other -> - false - end; -is_commited_ev(_Other) -> - false. diff --git a/apps/machinery_extra/src/machinery_mg_eventsink.erl b/apps/machinery_extra/src/machinery_mg_eventsink.erl deleted file mode 100644 index 223a2d8..0000000 --- a/apps/machinery_extra/src/machinery_mg_eventsink.erl +++ /dev/null @@ -1,135 +0,0 @@ --module(machinery_mg_eventsink). - --export([get_events/4]). --export([get_last_event_id/2]). - --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). - --define(EVENTSINK_CORE_OPTS, - schema := machinery_mg_schema:schema() -). - --type eventsink_id() :: binary(). --type event_id() :: integer(). --type eventsink_opts() :: #{ - client := machinery_mg_client:client(), - ?EVENTSINK_CORE_OPTS -}. - --type evsink_event(T) :: #{ - id := event_id(), - ns := binary(), - source_id := machinery:id(), - event := machinery:event(T) -}. - --export_type([evsink_event/1]). - --spec get_events(eventsink_id(), event_id(), integer(), eventsink_opts()) -> {ok, list(evsink_event(_))}. -get_events(EventSinkID, After, Limit, Opts) -> - {ok, get_history_range(EventSinkID, After, Limit, Opts)}. - --spec get_last_event_id(eventsink_id(), eventsink_opts()) -> {ok, event_id()} | {error, no_last_event}. -get_last_event_id(EventSinkID, Opts) -> - case get_history_range(EventSinkID, undefined, 1, backward, Opts) of - [#{id := ID}] -> - {ok, ID}; - [] -> - {error, no_last_event} - end. - -get_history_range(EventSinkID, After, Limit, Opts) -> - get_history_range(EventSinkID, After, Limit, forward, Opts). - -get_history_range(EventSinkID, After, Limit, Direction, #{client := Client, schema := Schema}) -> - {ok, Events} = call_eventsink( - 'GetHistory', - marshal(id, EventSinkID), - [marshal(history_range, {After, Limit, Direction})], - Client - ), - unmarshal({list, {evsink_event, Schema}}, Events). - -call_eventsink(Function, EventSinkID, Args, {Client, Context}) -> - Service = {mg_proto_state_processing_thrift, 'EventSink'}, - ArgsTuple = list_to_tuple([EventSinkID | Args]), - woody_client:call({Service, Function, ArgsTuple}, Client, Context). - -%% - -marshal(id, V) -> - marshal(string, V); -marshal(history_range, {After, Limit, Direction}) -> - #'mg_stateproc_HistoryRange'{ - 'after' = After, - 'limit' = Limit, - 'direction' = Direction - }; -marshal(string, V) when is_binary(V) -> - V. - -%% - -% TODO refactor this copy of mg_backend unmarshal -unmarshal(id, V) -> - unmarshal(string, V); -unmarshal(namespace, V) -> - unmarshal(atom, V); -unmarshal(event_id, V) -> - unmarshal(integer, V); -unmarshal(timestamp, V) when is_binary(V) -> - try - MilliSec = genlib_rfc3339:parse(V, millisecond), - case genlib_rfc3339:is_utc(V) of - false -> - erlang:error(badarg, [timestamp, V, badoffset]); - true -> - USec = MilliSec rem 1000, - DateTime = calendar:system_time_to_universal_time(MilliSec, millisecond), - {DateTime, USec} - end - catch - error:Reason:St -> - erlang:raise(error, {timestamp, V, Reason}, St) - end; -unmarshal( - {evsink_event, Schema}, - #'mg_stateproc_SinkEvent'{ - 'id' = ID, - 'source_ns' = NS0, - 'source_id' = SourceID0, - 'event' = Event - } -) -> - #mg_stateproc_Event{id = EventID, created_at = CreatedAt0, format_version = Format, data = Data0} = Event, - SourceID1 = unmarshal(id, SourceID0), - NS1 = unmarshal(namespace, NS0), - CreatedAt1 = unmarshal(timestamp, CreatedAt0), - Context = #{ - machine_id => SourceID1, - machine_ns => NS1, - created_at => CreatedAt1 - }, - {Data1, Context} = unmarshal({schema, Schema, {event, Format}, Context}, Data0), - #{ - id => unmarshal(event_id, ID), - ns => NS1, - source_id => SourceID1, - event => { - unmarshal(event_id, EventID), - CreatedAt1, - Data1 - } - }; -unmarshal({list, T}, V) when is_list(V) -> - [unmarshal(T, E) || E <- V]; -unmarshal({schema, Schema, T, Context}, V) -> - machinery_mg_schema:unmarshal(Schema, T, V, Context); -unmarshal(string, V) when is_binary(V) -> - V; -unmarshal(atom, V) when is_binary(V) -> - binary_to_existing_atom(V, utf8); -unmarshal(integer, V) when is_integer(V) -> - V; -unmarshal(T, V) -> - error(badarg, {T, V}). diff --git a/apps/w2w/test/w2w_adjustment_SUITE.erl b/apps/w2w/test/w2w_adjustment_SUITE.erl index 972d1c2..e03febe 100644 --- a/apps/w2w/test/w2w_adjustment_SUITE.erl +++ b/apps/w2w/test/w2w_adjustment_SUITE.erl @@ -25,7 +25,6 @@ -export([no_parallel_adjustments_test/1]). -export([no_pending_w2w_transfer_adjustments_test/1]). -export([unknown_w2w_transfer_test/1]). --export([consume_eventsinks/1]). %% Internal types @@ -42,7 +41,7 @@ -spec all() -> [test_case_name() | {group, group_name()}]. all() -> - [{group, default}, {group, eventsink}]. + [{group, default}]. -spec groups() -> [{group_name(), list(), [test_case_name()]}]. groups() -> @@ -58,9 +57,6 @@ groups() -> no_parallel_adjustments_test, no_pending_w2w_transfer_adjustments_test, unknown_w2w_transfer_test - ]}, - {eventsink, [], [ - consume_eventsinks ]} ]. @@ -298,11 +294,6 @@ unknown_w2w_transfer_test(_C) -> }), ?assertMatch({error, {unknown_w2w_transfer, W2WTransferID}}, Result). --spec consume_eventsinks(config()) -> test_return(). -consume_eventsinks(_) -> - EventSinks = [w2w_transfer_event_sink], - [_Events = ct_eventsink:consume(1000, Sink) || Sink <- EventSinks]. - %% Utils prepare_standard_environment({_Amount, Currency} = Cash, C) -> diff --git a/compose.tracing.yaml b/compose.tracing.yaml index 5a22ff7..9894d88 100644 --- a/compose.tracing.yaml +++ b/compose.tracing.yaml @@ -1,6 +1,29 @@ services: + dominant: + environment: &otlp_enabled + OTEL_TRACES_EXPORTER: otlp + OTEL_TRACES_SAMPLER: parentbased_always_off + OTEL_EXPORTER_OTLP_PROTOCOL: http_protobuf + OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4318 + + bender: + environment: *otlp_enabled + + limiter: + environment: *otlp_enabled + + party-management: + environment: *otlp_enabled + + machinegun: + environment: *otlp_enabled + testrunner: + environment: + <<: *otlp_enabled + OTEL_SERVICE_NAME: hellgate_testrunner + OTEL_TRACES_SAMPLER: parentbased_always_on depends_on: jaeger: condition: service_healthy diff --git a/compose.yaml b/compose.yaml index f9db2d5..04d4162 100644 --- a/compose.yaml +++ b/compose.yaml @@ -28,7 +28,7 @@ services: command: /sbin/init dominant: - image: ghcr.io/valitydev/dominant:sha-2150eea + image: ghcr.io/valitydev/dominant:sha-f17373b command: /opt/dominant/bin/dominant foreground depends_on: machinegun: @@ -40,7 +40,7 @@ services: retries: 10 machinegun: - image: ghcr.io/valitydev/machinegun:sha-5c0db56 + image: ghcr.io/valitydev/mg2:sha-436f723 command: /opt/machinegun/bin/machinegun foreground volumes: - ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml @@ -86,7 +86,7 @@ services: disable: true party-management: - image: ghcr.io/valitydev/party-management:sha-28c1b38 + image: ghcr.io/valitydev/party-management:sha-6d0040d command: /opt/party-management/bin/party-management foreground depends_on: machinegun: @@ -102,7 +102,7 @@ services: retries: 10 bender: - image: ghcr.io/valitydev/bender:sha-a3b227f + image: ghcr.io/valitydev/bender:sha-cf92a7d command: /opt/bender/bin/bender foreground depends_on: machinegun: diff --git a/config/sys.config b/config/sys.config index 5ca2890..58ec5ab 100644 --- a/config/sys.config +++ b/config/sys.config @@ -89,7 +89,6 @@ } }}, {services, #{ - 'eventsink' => "http://machinegun:8022/v1/event_sink", 'automaton' => "http://machinegun:8022/v1/automaton", 'accounter' => "http://shumway:8022/accounter", 'limiter' => "http://limiter:8022/v1/limiter" @@ -132,32 +131,6 @@ disk => {erl_health, disk, ["/", 99]}, memory => {erl_health, cg_memory, [99]}, service => {erl_health, service, [<<"fistful-server">>]} - }}, - {eventsink, #{ - identity => #{ - namespace => 'ff/identity' - }, - wallet => #{ - namespace => 'ff/wallet_v2' - }, - withdrawal => #{ - namespace => 'ff/withdrawal_v2' - }, - deposit => #{ - namespace => 'ff/deposit_v1' - }, - destination => #{ - namespace => 'ff/destination_v2' - }, - source => #{ - namespace => 'ff/source_v1' - }, - withdrawal_session => #{ - namespace => 'ff/withdrawal/session_v2' - }, - w2w_transfer => #{ - namespace => 'ff/w2w_transfer_v1' - } }} ]}, diff --git a/rebar.lock b/rebar.lock index 9e91d40..5ee2f96 100644 --- a/rebar.lock +++ b/rebar.lock @@ -37,11 +37,11 @@ 1}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health.git", - {ref,"7ffbc855bdbe79e23efad1803b0b185c9ea8d2f1"}}, + {ref,"49716470d0e8dab5e37db55d52dea78001735a3d"}}, 0}, {<<"fistful_proto">>, {git,"https://github.com/valitydev/fistful-proto.git", - {ref,"f2ca9b5225956a6bd9e1b3f84157f52c884fdb36"}}, + {ref,"02384257976db9dd90a734713e3ff052701fce11"}}, 0}, {<<"genlib">>, {git,"https://github.com/valitydev/genlib.git", @@ -59,14 +59,14 @@ 0}, {<<"machinery">>, {git,"https://github.com/valitydev/machinery-erlang.git", - {ref,"dc899e245be64551eebe2e42c9cf473f176fbf0e"}}, + {ref,"d62ceffbdb266bb4748bed34198e3575bba2dc73"}}, 0}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, {git,"https://github.com/valitydev/machinegun-proto", - {ref,"96f7f11b184c29d8b7e83cd7646f3f2c13662bda"}}, + {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}}, 1}, - {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2}, + {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.3.0">>},2}, {<<"msgpack_proto">>, {git,"https://github.com/valitydev/msgpack-proto.git", {ref,"7e447496aa5df4a5f1ace7ef2e3c31248b2a3ed0"}}, @@ -95,7 +95,7 @@ {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2}, {<<"scoper">>, {git,"https://github.com/valitydev/scoper.git", - {ref,"41a14a558667316998af9f49149ee087ffa8bef2"}}, + {ref,"55a2a32ee25e22fa35f583a18eaf38b2b743429b"}}, 0}, {<<"snowflake">>, {git,"https://github.com/valitydev/snowflake.git", @@ -116,7 +116,7 @@ 0}, {<<"woody">>, {git,"https://github.com/valitydev/woody_erlang.git", - {ref,"5d46291a6bfcee0bae2a9346a7d927603a909249"}}, + {ref,"81219ba5408e1c67f5eaed3c7e566ede42da88d4"}}, 0}]}. [ {pkg_hash,[ @@ -135,7 +135,7 @@ {<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, - {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, + {<<"mimerl">>, <<"D0CD9FC04B9061F82490F6581E0128379830E78535E017F7780F37FEA7545726">>}, {<<"opentelemetry">>, <<"988AC3C26ACAC9720A1D4FB8D9DC52E95B45ECFEC2D5B5583276A09E8936BC5E">>}, {<<"opentelemetry_api">>, <<"7B69ED4F40025C005DE0B74FCE8C0549625D59CB4DF12D15C32FE6DC5076FF42">>}, {<<"opentelemetry_exporter">>, <<"1D8809C0D4F4ACF986405F7700ED11992BCBDB6A4915DD11921E80777FFA7167">>}, @@ -165,7 +165,7 @@ {<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, - {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, + {<<"mimerl">>, <<"A1E15A50D1887217DE95F0B9B0793E32853F7C258A5CD227650889B38839FE9D">>}, {<<"opentelemetry">>, <<"8E09EDC26AAD11161509D7ECAD854A3285D88580F93B63B0B1CF0BAC332BFCC0">>}, {<<"opentelemetry_api">>, <<"6D7A27B7CAD2AD69A09CABF6670514CAFCEC717C8441BEB5C96322BAC3D05350">>}, {<<"opentelemetry_exporter">>, <<"2B40007F509D38361744882FD060A8841AF772AB83BB542AA5350908B303AD65">>}, diff --git a/test/machinegun/config.yaml b/test/machinegun/config.yaml index 1f65c95..12d7e9e 100644 --- a/test/machinegun/config.yaml +++ b/test/machinegun/config.yaml @@ -13,59 +13,27 @@ namespaces: # Fistful ff/identity: - event_sinks: - machine: - type: machine - machine_id: ff/identity processor: url: http://fistful-server:8022/v1/stateproc/ff/identity ff/wallet_v2: - event_sinks: - machine: - type: machine - machine_id: ff/wallet_v2 processor: url: http://fistful-server:8022/v1/stateproc/ff/wallet_v2 ff/source_v1: - event_sinks: - machine: - type: machine - machine_id: ff/source_v1 processor: url: http://fistful-server:8022/v1/stateproc/ff/source_v1 ff/deposit_v1: - event_sinks: - machine: - type: machine - machine_id: ff/deposit_v1 processor: url: http://fistful-server:8022/v1/stateproc/ff/deposit_v1 ff/destination_v2: - event_sinks: - machine: - type: machine - machine_id: ff/destination_v2 processor: url: http://fistful-server:8022/v1/stateproc/ff/destination_v2 ff/withdrawal_v2: - event_sinks: - machine: - type: machine - machine_id: ff/withdrawal_v2 processor: url: http://fistful-server:8022/v1/stateproc/ff/withdrawal_v2 ff/withdrawal/session_v2: - event_sinks: - machine: - type: machine - machine_id: ff/withdrawal/session_v2 processor: url: http://fistful-server:8022/v1/stateproc/ff/withdrawal/session_v2 ff/w2w_transfer_v1: - event_sinks: - machine: - type: machine - machine_id: ff/w2w_transfer_v1 processor: url: http://fistful-server:8022/v1/stateproc/ff/w2w_transfer_v1 @@ -93,8 +61,3 @@ storage: logging: out_type: stdout level: info - -opentelemetry: - exporter: - protocol: grpc - endpoint: http://jaeger:4317