TD-906: Retires eventsink publishers (#81)

* TD-906: Retires eventsink publishers

* Removes obsolete config options and disables codecov upload

* Bumps valitydev/fistful-proto@0238425

* Bumps deps
This commit is contained in:
Aleksey Kashapov 2024-05-28 16:10:46 +03:00 committed by GitHub
parent f599a4e394
commit a057039a7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 40 additions and 1377 deletions

View File

@ -29,7 +29,7 @@ jobs:
run: run:
name: Run checks name: Run checks
needs: setup needs: setup
uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.10 uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.14
with: with:
otp-version: ${{ needs.setup.outputs.otp-version }} otp-version: ${{ needs.setup.outputs.otp-version }}
rebar-version: ${{ needs.setup.outputs.rebar-version }} rebar-version: ${{ needs.setup.outputs.rebar-version }}
@ -37,3 +37,4 @@ jobs:
thrift-version: ${{ needs.setup.outputs.thrift-version }} thrift-version: ${{ needs.setup.outputs.thrift-version }}
run-ct-with-compose: true run-ct-with-compose: true
cache-version: v100 cache-version: v100
upload-coverage: false

View File

@ -1,94 +0,0 @@
-module(ct_eventsink).
-include_lib("fistful_proto/include/fistful_identity_thrift.hrl").
-include_lib("fistful_proto/include/fistful_wallet_thrift.hrl").
-include_lib("fistful_proto/include/fistful_wthd_thrift.hrl").
-include_lib("fistful_proto/include/fistful_wthd_session_thrift.hrl").
-include_lib("fistful_proto/include/fistful_destination_thrift.hrl").
-include_lib("fistful_proto/include/fistful_source_thrift.hrl").
-include_lib("fistful_proto/include/fistful_deposit_thrift.hrl").
-include_lib("fistful_proto/include/fistful_w2w_transfer_thrift.hrl").
-include_lib("fistful_proto/include/fistful_evsink_thrift.hrl").
-type sink() ::
ff_services:service_name().
-type event() ::
fistful_wallet_thrift:'SinkEvent'()
| fistful_wthd_thrift:'SinkEvent'()
| fistful_identity_thrift:'SinkEvent'()
| fistful_destination_thrift:'SinkEvent'()
| fistful_source_thrift:'SinkEvent'()
| fistful_deposit_thrift:'SinkEvent'()
| fistful_wthd_thrift:'SinkEvent'()
| fistful_w2w_transfer_thrift:'SinkEvent'().
-type event_id() :: fistful_evsink_thrift:'EventID'().
-type limit() :: non_neg_integer().
-export([last_id/1]).
-export([events/3]).
-export([consume/2]).
-export([fold/4]).
-export([get_max_event_id/1]).
%%
-spec last_id(sink()) -> event_id() | 0.
last_id(Sink) ->
case call_handler('GetLastEventID', Sink, {}) of
{ok, EventID} ->
EventID;
{exception, #'evsink_NoLastEvent'{}} ->
0
end.
-spec events(_After :: event_id() | undefined, limit(), sink()) -> {[event()], _Last :: event_id()}.
events(After, Limit, Sink) ->
Range = #'evsink_EventRange'{'after' = After, limit = Limit},
{ok, Events} = call_handler('GetEvents', Sink, {Range}),
{Events, get_max_event_id(Events)}.
-spec consume(_ChunkSize :: limit(), sink()) -> [event()].
consume(ChunkSize, Sink) ->
fold(fun(Chunk, Acc) -> Chunk ++ Acc end, [], ChunkSize, Sink).
-spec fold(fun(([event()], State) -> State), State, _ChunkSize :: limit(), sink()) -> State.
fold(FoldFun, InitialState, ChunkSize, Sink) ->
fold(FoldFun, InitialState, ChunkSize, Sink, undefined).
fold(FoldFun, State0, ChunkSize, Sink, Cursor) ->
{Events, LastID} = events(Cursor, ChunkSize, Sink),
State1 = FoldFun(Events, State0),
case length(Events) of
N when N >= ChunkSize ->
fold(FoldFun, State1, ChunkSize, Sink, LastID);
_ ->
State1
end.
-spec get_max_event_id([event()]) -> event_id().
get_max_event_id(Events) when is_list(Events) ->
lists:foldl(fun(Ev, Max) -> erlang:max(get_event_id(Ev), Max) end, 0, Events).
-spec get_event_id(event()) -> event_id().
get_event_id(#'wallet_SinkEvent'{id = ID}) -> ID;
get_event_id(#'wthd_SinkEvent'{id = ID}) -> ID;
get_event_id(#'identity_SinkEvent'{id = ID}) -> ID;
get_event_id(#'destination_SinkEvent'{id = ID}) -> ID;
get_event_id(#'source_SinkEvent'{id = ID}) -> ID;
get_event_id(#'deposit_SinkEvent'{id = ID}) -> ID;
get_event_id(#'wthd_session_SinkEvent'{id = ID}) -> ID;
get_event_id(#'w2w_transfer_SinkEvent'{id = ID}) -> ID.
call_handler(Function, ServiceName, Args) ->
Service = ff_services:get_service(ServiceName),
Path = erlang:list_to_binary(ff_services:get_service_path(ServiceName)),
Request = {Service, Function, Args},
Client = ff_woody_client:new(#{
url => <<"http://localhost:8022", Path/binary>>,
event_handler => ff_woody_event_handler
}),
ff_woody_client:call(Client, Request).

View File

@ -124,32 +124,6 @@ start_app(ff_server = AppName) ->
{port, 8022}, {port, 8022},
{admin, #{ {admin, #{
path => <<"/v1/admin">> path => <<"/v1/admin">>
}},
{eventsink, #{
identity => #{
namespace => 'ff/identity'
},
wallet => #{
namespace => 'ff/wallet_v2'
},
withdrawal => #{
namespace => 'ff/withdrawal_v2'
},
deposit => #{
namespace => 'ff/deposit_v1'
},
destination => #{
namespace => 'ff/destination_v2'
},
source => #{
namespace => 'ff/source_v1'
},
withdrawal_session => #{
namespace => 'ff/withdrawal/session_v2'
},
w2w_transfer => #{
namespace => 'ff/w2w_transfer_v1'
}
}} }}
]), ]),
#{} #{}

View File

@ -238,7 +238,6 @@ identity_provider_config(Options) ->
services(Options) -> services(Options) ->
Default = #{ Default = #{
ff_withdrawal_adapter_host => "http://fistful-server:8022/v1/ff_withdrawal_adapter_host", ff_withdrawal_adapter_host => "http://fistful-server:8022/v1/ff_withdrawal_adapter_host",
eventsink => "http://machinegun:8022/v1/event_sink",
automaton => "http://machinegun:8022/v1/automaton", automaton => "http://machinegun:8022/v1/automaton",
accounter => "http://shumway:8022/accounter", accounter => "http://shumway:8022/accounter",
partymgmt => "http://party-management:8022/v1/processing/partymgmt", partymgmt => "http://party-management:8022/v1/processing/partymgmt",

View File

@ -1,47 +0,0 @@
-module(ff_deposit_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_deposit_thrift.hrl").
-include_lib("fistful_proto/include/fistful_cashflow_thrift.hrl").
-include_lib("fistful_proto/include/fistful_fistful_base_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_deposit:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_deposit_thrift:'SinkEvent'()).
%%
%% Internals
%%
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#deposit_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #deposit_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%%
marshal(Type, Value) ->
ff_deposit_codec:marshal(Type, Value).

View File

@ -1,42 +0,0 @@
-module(ff_destination_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_destination_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_destination:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_destination_thrift:'SinkEvent'()).
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#destination_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #destination_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%%
%% Internals
%%
marshal(Type, Value) ->
ff_destination_codec:marshal(Type, Value).

View File

@ -1,55 +0,0 @@
-module(ff_eventsink_handler).
-behaviour(ff_woody_wrapper).
-export([handle_function/3]).
-include_lib("fistful_proto/include/fistful_evsink_thrift.hrl").
-type options() :: #{
schema := module(),
client := woody_client:options(),
ns := binary(),
publisher := module()
}.
%%
%% ff_woody_wrapper callbacks
%%
-spec handle_function(woody:func(), woody:args(), options()) -> {ok, woody:result()} | no_return().
handle_function(Func, Args, Opts) ->
scoper:scope(
eventsink_handler,
#{},
fun() ->
handle_function_(Func, Args, Opts)
end
).
handle_function_('GetEvents', {#'evsink_EventRange'{'after' = After0, limit = Limit}}, Options) ->
#{
schema := Schema,
client := Client,
ns := NS,
publisher := Publisher,
start_event := StartEvent
} = Options,
After = erlang:max(After0, StartEvent),
WoodyContext = ff_context:get_woody_context(ff_context:load()),
{ok, Events} = machinery_mg_eventsink:get_events(
NS,
After,
Limit,
#{client => {Client, WoodyContext}, schema => Schema}
),
ff_eventsink_publisher:publish_events(Events, #{publisher => Publisher});
handle_function_('GetLastEventID', _Params, #{schema := Schema, client := Client, ns := NS}) ->
WoodyContext = ff_context:get_woody_context(ff_context:load()),
Opts = #{client => {Client, WoodyContext}, schema => Schema},
case machinery_mg_eventsink:get_last_event_id(NS, Opts) of
{ok, _} = Result ->
Result;
{error, no_last_event} ->
woody_error:raise(business, #'evsink_NoLastEvent'{})
end.

View File

@ -1,40 +0,0 @@
%%%
%%% Publisher - he comes to publish all eventsinks
%%%
-module(ff_eventsink_publisher).
%% API
-type event(T) ::
machinery_mg_eventsink:evsink_event(
ff_machine:timestamped_event(T)
).
-type sinkevent(T) :: T.
-type options() :: #{publisher := module()}.
%% Behaviour definition
-export_type([event/1]).
-export_type([sinkevent/1]).
-export_type([options/0]).
-callback publish_events(list(event(_))) -> list(sinkevent(_)).
%% API
-export([publish_events/2]).
-spec publish_events(list(event(_)), options()) -> {ok, list(sinkevent(_))}.
publish_events(Events, Opts) ->
{ok, handler_publish_events(Events, Opts)}.
get_publicher(#{publisher := Publisher}) ->
Publisher.
%% Publisher calls
handler_publish_events(Events, Opts) ->
Publisher = get_publicher(Opts),
Publisher:publish_events(Events).

View File

@ -1,42 +0,0 @@
-module(ff_identity_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_identity_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_identity:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_identity_thrift:'SinkEvent'()).
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#identity_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #identity_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%%
%% Internals
%%
marshal(Type, Value) ->
ff_identity_codec:marshal(Type, Value).

View File

@ -98,7 +98,7 @@ init([]) ->
{w2w_transfer_management, ff_w2w_transfer_handler}, {w2w_transfer_management, ff_w2w_transfer_handler},
{w2w_transfer_repairer, ff_w2w_transfer_repair}, {w2w_transfer_repairer, ff_w2w_transfer_repair},
{ff_claim_committer, ff_claim_committer_handler} {ff_claim_committer, ff_claim_committer_handler}
] ++ get_eventsink_handlers(), ],
WoodyHandlers = [get_handler(Service, Handler, WrapperOpts) || {Service, Handler} <- Services], WoodyHandlers = [get_handler(Service, Handler, WrapperOpts) || {Service, Handler} <- Services],
ServicesChildSpec = woody_server:child_spec( ServicesChildSpec = woody_server:child_spec(
@ -172,40 +172,6 @@ get_service_client(ServiceID) ->
error({unknown_service, ServiceID}) error({unknown_service, ServiceID})
end. end.
get_eventsink_handlers() ->
Client = get_service_client(eventsink),
Cfg = #{
client => Client
},
Publishers = [
{deposit, deposit_event_sink, ff_deposit_eventsink_publisher},
{source, source_event_sink, ff_source_eventsink_publisher},
{destination, destination_event_sink, ff_destination_eventsink_publisher},
{identity, identity_event_sink, ff_identity_eventsink_publisher},
{wallet, wallet_event_sink, ff_wallet_eventsink_publisher},
{withdrawal, withdrawal_event_sink, ff_withdrawal_eventsink_publisher},
{withdrawal_session, withdrawal_session_event_sink, ff_withdrawal_session_eventsink_publisher},
{w2w_transfer, w2w_transfer_event_sink, ff_w2w_transfer_eventsink_publisher}
],
[get_eventsink_handler(Name, Service, Publisher, Cfg) || {Name, Service, Publisher} <- Publishers].
get_eventsink_handler(Name, Service, Publisher, Config) ->
Sinks = genlib_app:env(?MODULE, eventsink, #{}),
case maps:find(Name, Sinks) of
{ok, Opts} ->
NS = maps:get(namespace, Opts),
StartEvent = maps:get(start_event, Opts, 0),
FullConfig = Config#{
ns => erlang:atom_to_binary(NS, utf8),
publisher => Publisher,
start_event => StartEvent,
schema => get_namespace_schema(NS)
},
{Service, {ff_eventsink_handler, FullConfig}};
error ->
erlang:error({unknown_eventsink, Name, Sinks})
end.
get_namespace_schema('ff/identity') -> get_namespace_schema('ff/identity') ->
ff_identity_machinery_schema; ff_identity_machinery_schema;
get_namespace_schema('ff/wallet_v2') -> get_namespace_schema('ff/wallet_v2') ->

View File

@ -21,20 +21,6 @@ get_service(fistful_provider) ->
{fistful_provider_thrift, 'Management'}; {fistful_provider_thrift, 'Management'};
get_service(ff_withdrawal_adapter_host) -> get_service(ff_withdrawal_adapter_host) ->
{dmsl_wthd_provider_thrift, 'AdapterHost'}; {dmsl_wthd_provider_thrift, 'AdapterHost'};
get_service(deposit_event_sink) ->
{fistful_deposit_thrift, 'EventSink'};
get_service(source_event_sink) ->
{fistful_source_thrift, 'EventSink'};
get_service(destination_event_sink) ->
{fistful_destination_thrift, 'EventSink'};
get_service(identity_event_sink) ->
{fistful_identity_thrift, 'EventSink'};
get_service(wallet_event_sink) ->
{fistful_wallet_thrift, 'EventSink'};
get_service(withdrawal_event_sink) ->
{fistful_wthd_thrift, 'EventSink'};
get_service(withdrawal_session_event_sink) ->
{fistful_wthd_session_thrift, 'EventSink'};
get_service(withdrawal_session_repairer) -> get_service(withdrawal_session_repairer) ->
{fistful_wthd_session_thrift, 'Repairer'}; {fistful_wthd_session_thrift, 'Repairer'};
get_service(withdrawal_repairer) -> get_service(withdrawal_repairer) ->
@ -55,8 +41,6 @@ get_service(withdrawal_session_management) ->
{fistful_wthd_session_thrift, 'Management'}; {fistful_wthd_session_thrift, 'Management'};
get_service(deposit_management) -> get_service(deposit_management) ->
{fistful_deposit_thrift, 'Management'}; {fistful_deposit_thrift, 'Management'};
get_service(w2w_transfer_event_sink) ->
{fistful_w2w_transfer_thrift, 'EventSink'};
get_service(w2w_transfer_repairer) -> get_service(w2w_transfer_repairer) ->
{fistful_w2w_transfer_thrift, 'Repairer'}; {fistful_w2w_transfer_thrift, 'Repairer'};
get_service(w2w_transfer_management) -> get_service(w2w_transfer_management) ->
@ -75,20 +59,6 @@ get_service_path(fistful_provider) ->
"/v1/provider"; "/v1/provider";
get_service_path(ff_withdrawal_adapter_host) -> get_service_path(ff_withdrawal_adapter_host) ->
"/v1/ff_withdrawal_adapter_host"; "/v1/ff_withdrawal_adapter_host";
get_service_path(deposit_event_sink) ->
"/v1/eventsink/deposit";
get_service_path(source_event_sink) ->
"/v1/eventsink/source";
get_service_path(destination_event_sink) ->
"/v1/eventsink/destination";
get_service_path(identity_event_sink) ->
"/v1/eventsink/identity";
get_service_path(wallet_event_sink) ->
"/v1/eventsink/wallet";
get_service_path(withdrawal_event_sink) ->
"/v1/eventsink/withdrawal";
get_service_path(withdrawal_session_event_sink) ->
"/v1/eventsink/withdrawal/session";
get_service_path(withdrawal_session_repairer) -> get_service_path(withdrawal_session_repairer) ->
"/v1/repair/withdrawal/session"; "/v1/repair/withdrawal/session";
get_service_path(withdrawal_repairer) -> get_service_path(withdrawal_repairer) ->
@ -109,8 +79,6 @@ get_service_path(withdrawal_session_management) ->
"/v1/withdrawal_session"; "/v1/withdrawal_session";
get_service_path(deposit_management) -> get_service_path(deposit_management) ->
"/v1/deposit"; "/v1/deposit";
get_service_path(w2w_transfer_event_sink) ->
"/v1/eventsink/w2w_transfer";
get_service_path(w2w_transfer_repairer) -> get_service_path(w2w_transfer_repairer) ->
"/v1/repair/w2w_transfer"; "/v1/repair/w2w_transfer";
get_service_path(w2w_transfer_management) -> get_service_path(w2w_transfer_management) ->

View File

@ -1,42 +0,0 @@
-module(ff_source_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_source_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_source:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_source_thrift:'SinkEvent'()).
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#source_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #source_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%%
%% Internals
%%
marshal(Type, Value) ->
ff_source_codec:marshal(Type, Value).

View File

@ -1,47 +0,0 @@
-module(ff_w2w_transfer_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_w2w_transfer_thrift.hrl").
-include_lib("fistful_proto/include/fistful_cashflow_thrift.hrl").
-include_lib("fistful_proto/include/fistful_fistful_base_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(w2w_transfer:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_w2w_transfer_thrift:'SinkEvent'()).
%%
%% Internals
%%
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#w2w_transfer_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #w2w_transfer_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%%
marshal(Type, Value) ->
ff_w2w_transfer_codec:marshal(Type, Value).

View File

@ -1,42 +0,0 @@
-module(ff_wallet_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_wallet_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_wallet:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_wallet_thrift:'SinkEvent'()).
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#wallet_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #wallet_Event{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%%
%% Internals
%%
marshal(Type, Value) ->
ff_wallet_codec:marshal(Type, Value).

View File

@ -1,47 +0,0 @@
-module(ff_withdrawal_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_wthd_thrift.hrl").
-include_lib("fistful_proto/include/fistful_cashflow_thrift.hrl").
-include_lib("fistful_proto/include/fistful_fistful_base_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_withdrawal:event()).
-type sinkevent() :: ff_eventsink_publisher:sinkevent(fistful_wthd_thrift:'SinkEvent'()).
%%
%% Internals
%%
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#wthd_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #wthd_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%% Internals
marshal(Type, Value) ->
ff_withdrawal_codec:marshal(Type, Value).

View File

@ -1,48 +0,0 @@
-module(ff_withdrawal_session_eventsink_publisher).
-behaviour(ff_eventsink_publisher).
-export([publish_events/1]).
-include_lib("fistful_proto/include/fistful_wthd_session_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-type event() :: ff_eventsink_publisher:event(ff_withdrawal_session:event()).
-type sinkevent() ::
ff_eventsink_publisher:sinkevent(
fistful_wthd_session_thrift:'SinkEvent'()
).
%%
%% Internals
%%
-spec publish_events(list(event())) -> list(sinkevent()).
publish_events(Events) ->
[publish_event(Event) || Event <- Events].
-spec publish_event(event()) -> sinkevent().
publish_event(#{
id := ID,
source_id := SourceID,
event := {
EventID,
Dt,
{ev, EventDt, Payload}
}
}) ->
#wthd_session_SinkEvent{
id = marshal(event_id, ID),
created_at = marshal(timestamp, Dt),
source = marshal(id, SourceID),
payload = #wthd_session_Event{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, Payload)]
}
}.
%% Internals
marshal(Type, Value) ->
ff_withdrawal_session_codec:marshal(Type, Value).

View File

@ -1,514 +0,0 @@
-module(ff_eventsink_SUITE).
-include_lib("fistful_proto/include/fistful_wthd_thrift.hrl").
-include_lib("fistful_proto/include/fistful_evsink_thrift.hrl").
-include_lib("fistful_proto/include/fistful_transfer_thrift.hrl").
-export([all/0]).
-export([groups/0]).
-export([init_per_suite/1]).
-export([end_per_suite/1]).
-export([init_per_group/2]).
-export([end_per_group/2]).
-export([init_per_testcase/2]).
-export([end_per_testcase/2]).
-export([get_identity_events_ok/1]).
-export([get_create_wallet_events_ok/1]).
-export([get_withdrawal_events_ok/1]).
-export([get_withdrawal_session_events_ok/1]).
-export([get_create_destination_events_ok/1]).
-export([get_create_source_events_ok/1]).
-export([get_create_deposit_events_ok/1]).
-export([get_shifted_create_identity_events_ok/1]).
-export([get_create_w2w_transfer_events_ok/1]).
-type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name().
-type group_name() :: ct_helper:group_name().
-type test_return() :: _ | no_return().
-spec all() -> [test_case_name()].
all() ->
[
get_identity_events_ok,
get_create_wallet_events_ok,
get_withdrawal_events_ok,
get_create_destination_events_ok,
get_create_source_events_ok,
get_create_deposit_events_ok,
get_withdrawal_session_events_ok,
get_shifted_create_identity_events_ok,
get_create_w2w_transfer_events_ok
].
-spec groups() -> [].
groups() -> [].
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
ct_helper:makeup_cfg(
[
ct_helper:test_case_name(init),
ct_payment_system:setup()
],
C
).
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
ok = ct_payment_system:shutdown(C).
%%
-spec init_per_group(group_name(), config()) -> config().
init_per_group(_, C) ->
C.
-spec end_per_group(group_name(), config()) -> _.
end_per_group(_, _) ->
ok.
%%
-spec init_per_testcase(test_case_name(), config()) -> config().
init_per_testcase(Name, C) ->
C1 = ct_helper:makeup_cfg([ct_helper:test_case_name(Name), ct_helper:woody_ctx()], C),
ok = ct_helper:set_context(C1),
C1.
-spec end_per_testcase(test_case_name(), config()) -> _.
end_per_testcase(_Name, _C) ->
ok = ct_helper:unset_context().
%%
-spec get_identity_events_ok(config()) -> test_return().
get_identity_events_ok(C) ->
ID = genlib:unique(),
Party = create_party(C),
Name = <<"Identity Name">>,
Sink = identity_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
ok = ff_identity_machine:create(
#{
id => ID,
name => Name,
party => Party,
provider => <<"good-one">>
},
#{<<"com.valitydev.wapi">> => #{<<"name">> => Name}}
),
{ok, RawEvents} = ff_identity_machine:events(ID, {undefined, 1000}),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
-spec get_create_wallet_events_ok(config()) -> test_return().
get_create_wallet_events_ok(C) ->
ID = genlib:unique(),
Party = create_party(C),
IdentityID = create_identity(Party, C),
Sink = wallet_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
ok = ff_wallet_machine:create(
#{
id => ID,
identity => IdentityID,
name => <<"EVENTS TEST">>,
currency => <<"RUB">>
},
ff_entity_context:new()
),
{ok, RawEvents} = ff_wallet_machine:events(ID, {undefined, 1000}),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
-spec get_withdrawal_events_ok(config()) -> test_return().
get_withdrawal_events_ok(C) ->
Sink = withdrawal_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
Party = create_party(C),
IID = create_identity(Party, C),
WalID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C),
SrcID = create_source(IID, C),
_DepID = process_deposit(SrcID, WalID),
DestID = create_destination(IID, C),
WdrID = process_withdrawal(WalID, DestID),
{Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
{ok, RawEvents} = ff_withdrawal_machine:events(WdrID, {undefined, 1000}),
AlienEvents = lists:filter(
fun(Ev) ->
Ev#wthd_SinkEvent.source =/= WdrID
end,
Events
),
MaxID = LastEvent + length(RawEvents) + length(AlienEvents).
-spec get_withdrawal_session_events_ok(config()) -> test_return().
get_withdrawal_session_events_ok(C) ->
Sink = withdrawal_session_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
Party = create_party(C),
IID = create_identity(Party, C),
WalID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C),
SrcID = create_source(IID, C),
_DepID = process_deposit(SrcID, WalID),
DestID = create_destination(IID, C),
WdrID = process_withdrawal(WalID, DestID),
{ok, St} = ff_withdrawal_machine:get(WdrID),
Withdrawal = ff_withdrawal_machine:withdrawal(St),
[#{id := SessID}] = ff_withdrawal:sessions(Withdrawal),
{ok, RawEvents} = ff_withdrawal_session_machine:events(
SessID,
{undefined, 1000}
),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
-spec get_create_destination_events_ok(config()) -> test_return().
get_create_destination_events_ok(C) ->
Sink = destination_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
Party = create_party(C),
IID = create_identity(Party, C),
DestID = create_destination(IID, C),
{ok, RawEvents} = ff_destination_machine:events(DestID, {undefined, 1000}),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
-spec get_create_source_events_ok(config()) -> test_return().
get_create_source_events_ok(C) ->
Sink = source_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
Party = create_party(C),
IID = create_identity(Party, C),
SrcID = create_source(IID, C),
{ok, RawEvents} = ff_source_machine:events(SrcID, {undefined, 1000}),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
-spec get_create_deposit_events_ok(config()) -> test_return().
get_create_deposit_events_ok(C) ->
Sink = deposit_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
Party = create_party(C),
IID = create_identity(Party, C),
WalID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C),
SrcID = create_source(IID, C),
DepID = process_deposit(SrcID, WalID),
{ok, RawEvents} = ff_deposit_machine:events(DepID, {undefined, 1000}),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
-spec get_shifted_create_identity_events_ok(config()) -> test_return().
get_shifted_create_identity_events_ok(C) ->
#{suite_sup := SuiteSup} = ct_helper:cfg(payment_system, C),
Service = identity_event_sink,
StartEventNum = 3,
IdentityRoute = create_sink_route(
Service,
{ff_eventsink_handler, #{
ns => <<"ff/identity">>,
publisher => ff_identity_eventsink_publisher,
start_event => StartEventNum,
schema => ff_identity_machinery_schema
}}
),
{ok, _} = supervisor:start_child(
SuiteSup,
woody_server:child_spec(
?MODULE,
#{
ip => {0, 0, 0, 0},
port => 8040,
handlers => [],
event_handler => ff_woody_event_handler,
additional_routes => IdentityRoute
}
)
),
{ok, Events} = call_route_handler('GetEvents', Service, {#'evsink_EventRange'{'after' = 0, limit = 1}}),
MaxID = ct_eventsink:get_max_event_id(Events),
MaxID = StartEventNum + 1.
-spec get_create_w2w_transfer_events_ok(config()) -> test_return().
get_create_w2w_transfer_events_ok(C) ->
Sink = w2w_transfer_event_sink,
LastEvent = ct_eventsink:last_id(Sink),
Party = create_party(C),
IID = create_identity(Party, C),
WalFromID = create_wallet(IID, <<"HAHA NO1">>, <<"RUB">>, C),
WalToID = create_wallet(IID, <<"HAHA NO2">>, <<"RUB">>, C),
SrcID = create_source(IID, C),
_DepID = process_deposit(SrcID, WalFromID),
ID = process_w2w(WalFromID, WalToID),
{ok, RawEvents} = w2w_transfer_machine:events(ID, {undefined, 1000}),
{_Events, MaxID} = ct_eventsink:events(LastEvent, 1000, Sink),
MaxID = LastEvent + length(RawEvents).
create_party(_C) ->
ID = genlib:bsuuid(),
_ = ff_party:create(ID),
ID.
create_identity(Party, C) ->
create_identity(Party, <<"good-one">>, C).
create_identity(Party, ProviderID, C) ->
create_identity(Party, <<"Identity Name">>, ProviderID, C).
create_identity(Party, Name, ProviderID, _C) ->
ID = genlib:unique(),
ok = ff_identity_machine:create(
#{id => ID, name => Name, party => Party, provider => ProviderID},
#{<<"com.valitydev.wapi">> => #{<<"name">> => Name}}
),
ID.
create_wallet(IdentityID, Name, Currency, _C) ->
ID = genlib:unique(),
ok = ff_wallet_machine:create(
#{id => ID, identity => IdentityID, name => Name, currency => Currency},
ff_entity_context:new()
),
ID.
create_source(IdentityID, Name, Currency, Resource) ->
ID = genlib:unique(),
ok = ff_source_machine:create(
#{id => ID, identity => IdentityID, name => Name, currency => Currency, resource => Resource},
ff_entity_context:new()
),
ID.
create_destination(IdentityID, Name, Currency, Resource) ->
ID = genlib:unique(),
ok = ff_destination_machine:create(
#{id => ID, identity => IdentityID, name => Name, currency => Currency, resource => Resource},
ff_entity_context:new()
),
ID.
generate_id() ->
genlib:to_binary(genlib_time:ticks()).
create_source(IID, _C) ->
SrcResource = #{type => internal, details => <<"Infinite source of cash">>},
SrcID = create_source(IID, <<"XSource">>, <<"RUB">>, SrcResource),
authorized = ct_helper:await(
authorized,
fun() ->
{ok, SrcM} = ff_source_machine:get(SrcID),
Source = ff_source_machine:source(SrcM),
ff_source:status(Source)
end
),
SrcID.
process_deposit(SrcID, WalID) ->
DepID = generate_id(),
ok = ff_deposit_machine:create(
#{id => DepID, source_id => SrcID, wallet_id => WalID, body => {10000, <<"RUB">>}},
ff_entity_context:new()
),
succeeded = await_final_deposit_status(DepID),
DepID.
create_destination(IID, C) ->
DestResource = {bank_card, #{bank_card => ct_cardstore:bank_card(<<"4150399999000900">>, {12, 2025}, C)}},
DestID = create_destination(IID, <<"XDesination">>, <<"RUB">>, DestResource),
authorized = ct_helper:await(
authorized,
fun() ->
{ok, DestM} = ff_destination_machine:get(DestID),
Destination = ff_destination_machine:destination(DestM),
ff_destination:status(Destination)
end
),
DestID.
process_withdrawal(WalID, DestID) ->
WdrID = generate_id(),
ok = ff_withdrawal_machine:create(
#{id => WdrID, wallet_id => WalID, destination_id => DestID, body => {4240, <<"RUB">>}},
ff_entity_context:new()
),
succeeded = await_final_withdrawal_status(WdrID),
true = ct_helper:await(
true,
fun() ->
Sink = withdrawal_event_sink,
{Events, _MaxID} = ct_eventsink:events(undefined, 1000, Sink),
search_event_commited(Events, WdrID)
end,
% genlib_retry:linear(15, 1000)
genlib_retry:linear(5, 1000)
),
WdrID.
process_w2w(WalletFromID, WalletToID) ->
ID = generate_id(),
ok = w2w_transfer_machine:create(
#{id => ID, wallet_from_id => WalletFromID, wallet_to_id => WalletToID, body => {10000, <<"RUB">>}},
ff_entity_context:new()
),
succeeded = await_final_w2w_transfer_status(ID),
ID.
get_w2w_transfer(DepositID) ->
{ok, Machine} = w2w_transfer_machine:get(DepositID),
w2w_transfer_machine:w2w_transfer(Machine).
get_w2w_transfer_status(DepositID) ->
w2w_transfer:status(get_w2w_transfer(DepositID)).
await_final_w2w_transfer_status(DepositID) ->
finished = ct_helper:await(
finished,
fun() ->
{ok, Machine} = w2w_transfer_machine:get(DepositID),
Deposit = w2w_transfer_machine:w2w_transfer(Machine),
case w2w_transfer:is_finished(Deposit) of
false ->
{not_finished, Deposit};
true ->
finished
end
end,
genlib_retry:linear(90, 1000)
),
get_w2w_transfer_status(DepositID).
get_deposit(DepositID) ->
{ok, Machine} = ff_deposit_machine:get(DepositID),
ff_deposit_machine:deposit(Machine).
get_deposit_status(DepositID) ->
ff_deposit:status(get_deposit(DepositID)).
await_final_deposit_status(DepositID) ->
finished = ct_helper:await(
finished,
fun() ->
{ok, Machine} = ff_deposit_machine:get(DepositID),
Deposit = ff_deposit_machine:deposit(Machine),
case ff_deposit:is_finished(Deposit) of
false ->
{not_finished, Deposit};
true ->
finished
end
end,
genlib_retry:linear(90, 1000)
),
get_deposit_status(DepositID).
get_withdrawal(WithdrawalID) ->
{ok, Machine} = ff_withdrawal_machine:get(WithdrawalID),
ff_withdrawal_machine:withdrawal(Machine).
get_withdrawal_status(WithdrawalID) ->
ff_withdrawal:status(get_withdrawal(WithdrawalID)).
await_final_withdrawal_status(WithdrawalID) ->
finished = ct_helper:await(
finished,
fun() ->
{ok, Machine} = ff_withdrawal_machine:get(WithdrawalID),
Withdrawal = ff_withdrawal_machine:withdrawal(Machine),
case ff_withdrawal:is_finished(Withdrawal) of
false ->
{not_finished, Withdrawal};
true ->
finished
end
end,
genlib_retry:linear(90, 1000)
),
get_withdrawal_status(WithdrawalID).
call_route_handler(Function, ServiceName, Args) ->
call_handler(Function, ServiceName, Args, <<"8040">>).
call_handler(Function, ServiceName, Args, Port) ->
Service = ff_services:get_service(ServiceName),
Path = erlang:list_to_binary(ff_services:get_service_path(ServiceName)),
Request = {Service, Function, Args},
Client = ff_woody_client:new(#{
url => <<"http://localhost:", Port/binary, Path/binary>>,
event_handler => ff_woody_event_handler
}),
ff_woody_client:call(Client, Request).
create_sink_route(ServiceName, {Handler, Cfg}) ->
Service = ff_services:get_service(ServiceName),
Path = ff_services:get_service_path(ServiceName),
NewCfg = Cfg#{
client => #{
event_handler => ff_woody_event_handler,
url => "http://machinegun:8022/v1/event_sink"
}
},
PartyClient = party_client:create_client(),
WrapperOptions = #{
handler => {Handler, NewCfg},
party_client => PartyClient
},
woody_server_thrift_http_handler:get_routes(
genlib_map:compact(#{
handlers => [{Path, {Service, {ff_woody_wrapper, WrapperOptions}}}],
event_handler => ff_woody_event_handler
})
).
search_event_commited(Events, WdrID) ->
ClearEv = lists:filter(
fun(Ev) ->
case Ev#wthd_SinkEvent.source of
WdrID -> true;
_ -> false
end
end,
Events
),
TransferCommited = lists:filter(
fun(Ev) ->
Payload = Ev#wthd_SinkEvent.payload,
Changes = Payload#wthd_EventSinkPayload.changes,
lists:any(fun is_commited_ev/1, Changes)
end,
ClearEv
),
length(TransferCommited) =/= 0.
is_commited_ev({transfer, #wthd_TransferChange{payload = TransferEvent}}) ->
case TransferEvent of
{status_changed, #transfer_StatusChange{status = {committed, #transfer_Committed{}}}} ->
true;
_Other ->
false
end;
is_commited_ev(_Other) ->
false.

View File

@ -1,135 +0,0 @@
-module(machinery_mg_eventsink).
-export([get_events/4]).
-export([get_last_event_id/2]).
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-define(EVENTSINK_CORE_OPTS,
schema := machinery_mg_schema:schema()
).
-type eventsink_id() :: binary().
-type event_id() :: integer().
-type eventsink_opts() :: #{
client := machinery_mg_client:client(),
?EVENTSINK_CORE_OPTS
}.
-type evsink_event(T) :: #{
id := event_id(),
ns := binary(),
source_id := machinery:id(),
event := machinery:event(T)
}.
-export_type([evsink_event/1]).
-spec get_events(eventsink_id(), event_id(), integer(), eventsink_opts()) -> {ok, list(evsink_event(_))}.
get_events(EventSinkID, After, Limit, Opts) ->
{ok, get_history_range(EventSinkID, After, Limit, Opts)}.
-spec get_last_event_id(eventsink_id(), eventsink_opts()) -> {ok, event_id()} | {error, no_last_event}.
get_last_event_id(EventSinkID, Opts) ->
case get_history_range(EventSinkID, undefined, 1, backward, Opts) of
[#{id := ID}] ->
{ok, ID};
[] ->
{error, no_last_event}
end.
get_history_range(EventSinkID, After, Limit, Opts) ->
get_history_range(EventSinkID, After, Limit, forward, Opts).
get_history_range(EventSinkID, After, Limit, Direction, #{client := Client, schema := Schema}) ->
{ok, Events} = call_eventsink(
'GetHistory',
marshal(id, EventSinkID),
[marshal(history_range, {After, Limit, Direction})],
Client
),
unmarshal({list, {evsink_event, Schema}}, Events).
call_eventsink(Function, EventSinkID, Args, {Client, Context}) ->
Service = {mg_proto_state_processing_thrift, 'EventSink'},
ArgsTuple = list_to_tuple([EventSinkID | Args]),
woody_client:call({Service, Function, ArgsTuple}, Client, Context).
%%
marshal(id, V) ->
marshal(string, V);
marshal(history_range, {After, Limit, Direction}) ->
#'mg_stateproc_HistoryRange'{
'after' = After,
'limit' = Limit,
'direction' = Direction
};
marshal(string, V) when is_binary(V) ->
V.
%%
% TODO refactor this copy of mg_backend unmarshal
unmarshal(id, V) ->
unmarshal(string, V);
unmarshal(namespace, V) ->
unmarshal(atom, V);
unmarshal(event_id, V) ->
unmarshal(integer, V);
unmarshal(timestamp, V) when is_binary(V) ->
try
MilliSec = genlib_rfc3339:parse(V, millisecond),
case genlib_rfc3339:is_utc(V) of
false ->
erlang:error(badarg, [timestamp, V, badoffset]);
true ->
USec = MilliSec rem 1000,
DateTime = calendar:system_time_to_universal_time(MilliSec, millisecond),
{DateTime, USec}
end
catch
error:Reason:St ->
erlang:raise(error, {timestamp, V, Reason}, St)
end;
unmarshal(
{evsink_event, Schema},
#'mg_stateproc_SinkEvent'{
'id' = ID,
'source_ns' = NS0,
'source_id' = SourceID0,
'event' = Event
}
) ->
#mg_stateproc_Event{id = EventID, created_at = CreatedAt0, format_version = Format, data = Data0} = Event,
SourceID1 = unmarshal(id, SourceID0),
NS1 = unmarshal(namespace, NS0),
CreatedAt1 = unmarshal(timestamp, CreatedAt0),
Context = #{
machine_id => SourceID1,
machine_ns => NS1,
created_at => CreatedAt1
},
{Data1, Context} = unmarshal({schema, Schema, {event, Format}, Context}, Data0),
#{
id => unmarshal(event_id, ID),
ns => NS1,
source_id => SourceID1,
event => {
unmarshal(event_id, EventID),
CreatedAt1,
Data1
}
};
unmarshal({list, T}, V) when is_list(V) ->
[unmarshal(T, E) || E <- V];
unmarshal({schema, Schema, T, Context}, V) ->
machinery_mg_schema:unmarshal(Schema, T, V, Context);
unmarshal(string, V) when is_binary(V) ->
V;
unmarshal(atom, V) when is_binary(V) ->
binary_to_existing_atom(V, utf8);
unmarshal(integer, V) when is_integer(V) ->
V;
unmarshal(T, V) ->
error(badarg, {T, V}).

View File

@ -25,7 +25,6 @@
-export([no_parallel_adjustments_test/1]). -export([no_parallel_adjustments_test/1]).
-export([no_pending_w2w_transfer_adjustments_test/1]). -export([no_pending_w2w_transfer_adjustments_test/1]).
-export([unknown_w2w_transfer_test/1]). -export([unknown_w2w_transfer_test/1]).
-export([consume_eventsinks/1]).
%% Internal types %% Internal types
@ -42,7 +41,7 @@
-spec all() -> [test_case_name() | {group, group_name()}]. -spec all() -> [test_case_name() | {group, group_name()}].
all() -> all() ->
[{group, default}, {group, eventsink}]. [{group, default}].
-spec groups() -> [{group_name(), list(), [test_case_name()]}]. -spec groups() -> [{group_name(), list(), [test_case_name()]}].
groups() -> groups() ->
@ -58,9 +57,6 @@ groups() ->
no_parallel_adjustments_test, no_parallel_adjustments_test,
no_pending_w2w_transfer_adjustments_test, no_pending_w2w_transfer_adjustments_test,
unknown_w2w_transfer_test unknown_w2w_transfer_test
]},
{eventsink, [], [
consume_eventsinks
]} ]}
]. ].
@ -298,11 +294,6 @@ unknown_w2w_transfer_test(_C) ->
}), }),
?assertMatch({error, {unknown_w2w_transfer, W2WTransferID}}, Result). ?assertMatch({error, {unknown_w2w_transfer, W2WTransferID}}, Result).
-spec consume_eventsinks(config()) -> test_return().
consume_eventsinks(_) ->
EventSinks = [w2w_transfer_event_sink],
[_Events = ct_eventsink:consume(1000, Sink) || Sink <- EventSinks].
%% Utils %% Utils
prepare_standard_environment({_Amount, Currency} = Cash, C) -> prepare_standard_environment({_Amount, Currency} = Cash, C) ->

View File

@ -1,6 +1,29 @@
services: services:
dominant:
environment: &otlp_enabled
OTEL_TRACES_EXPORTER: otlp
OTEL_TRACES_SAMPLER: parentbased_always_off
OTEL_EXPORTER_OTLP_PROTOCOL: http_protobuf
OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4318
bender:
environment: *otlp_enabled
limiter:
environment: *otlp_enabled
party-management:
environment: *otlp_enabled
machinegun:
environment: *otlp_enabled
testrunner: testrunner:
environment:
<<: *otlp_enabled
OTEL_SERVICE_NAME: hellgate_testrunner
OTEL_TRACES_SAMPLER: parentbased_always_on
depends_on: depends_on:
jaeger: jaeger:
condition: service_healthy condition: service_healthy

View File

@ -28,7 +28,7 @@ services:
command: /sbin/init command: /sbin/init
dominant: dominant:
image: ghcr.io/valitydev/dominant:sha-2150eea image: ghcr.io/valitydev/dominant:sha-f17373b
command: /opt/dominant/bin/dominant foreground command: /opt/dominant/bin/dominant foreground
depends_on: depends_on:
machinegun: machinegun:
@ -40,7 +40,7 @@ services:
retries: 10 retries: 10
machinegun: machinegun:
image: ghcr.io/valitydev/machinegun:sha-5c0db56 image: ghcr.io/valitydev/mg2:sha-436f723
command: /opt/machinegun/bin/machinegun foreground command: /opt/machinegun/bin/machinegun foreground
volumes: volumes:
- ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml - ./test/machinegun/config.yaml:/opt/machinegun/etc/config.yaml
@ -86,7 +86,7 @@ services:
disable: true disable: true
party-management: party-management:
image: ghcr.io/valitydev/party-management:sha-28c1b38 image: ghcr.io/valitydev/party-management:sha-6d0040d
command: /opt/party-management/bin/party-management foreground command: /opt/party-management/bin/party-management foreground
depends_on: depends_on:
machinegun: machinegun:
@ -102,7 +102,7 @@ services:
retries: 10 retries: 10
bender: bender:
image: ghcr.io/valitydev/bender:sha-a3b227f image: ghcr.io/valitydev/bender:sha-cf92a7d
command: /opt/bender/bin/bender foreground command: /opt/bender/bin/bender foreground
depends_on: depends_on:
machinegun: machinegun:

View File

@ -89,7 +89,6 @@
} }
}}, }},
{services, #{ {services, #{
'eventsink' => "http://machinegun:8022/v1/event_sink",
'automaton' => "http://machinegun:8022/v1/automaton", 'automaton' => "http://machinegun:8022/v1/automaton",
'accounter' => "http://shumway:8022/accounter", 'accounter' => "http://shumway:8022/accounter",
'limiter' => "http://limiter:8022/v1/limiter" 'limiter' => "http://limiter:8022/v1/limiter"
@ -132,32 +131,6 @@
disk => {erl_health, disk, ["/", 99]}, disk => {erl_health, disk, ["/", 99]},
memory => {erl_health, cg_memory, [99]}, memory => {erl_health, cg_memory, [99]},
service => {erl_health, service, [<<"fistful-server">>]} service => {erl_health, service, [<<"fistful-server">>]}
}},
{eventsink, #{
identity => #{
namespace => 'ff/identity'
},
wallet => #{
namespace => 'ff/wallet_v2'
},
withdrawal => #{
namespace => 'ff/withdrawal_v2'
},
deposit => #{
namespace => 'ff/deposit_v1'
},
destination => #{
namespace => 'ff/destination_v2'
},
source => #{
namespace => 'ff/source_v1'
},
withdrawal_session => #{
namespace => 'ff/withdrawal/session_v2'
},
w2w_transfer => #{
namespace => 'ff/w2w_transfer_v1'
}
}} }}
]}, ]},

View File

@ -37,11 +37,11 @@
1}, 1},
{<<"erl_health">>, {<<"erl_health">>,
{git,"https://github.com/valitydev/erlang-health.git", {git,"https://github.com/valitydev/erlang-health.git",
{ref,"7ffbc855bdbe79e23efad1803b0b185c9ea8d2f1"}}, {ref,"49716470d0e8dab5e37db55d52dea78001735a3d"}},
0}, 0},
{<<"fistful_proto">>, {<<"fistful_proto">>,
{git,"https://github.com/valitydev/fistful-proto.git", {git,"https://github.com/valitydev/fistful-proto.git",
{ref,"f2ca9b5225956a6bd9e1b3f84157f52c884fdb36"}}, {ref,"02384257976db9dd90a734713e3ff052701fce11"}},
0}, 0},
{<<"genlib">>, {<<"genlib">>,
{git,"https://github.com/valitydev/genlib.git", {git,"https://github.com/valitydev/genlib.git",
@ -59,14 +59,14 @@
0}, 0},
{<<"machinery">>, {<<"machinery">>,
{git,"https://github.com/valitydev/machinery-erlang.git", {git,"https://github.com/valitydev/machinery-erlang.git",
{ref,"dc899e245be64551eebe2e42c9cf473f176fbf0e"}}, {ref,"d62ceffbdb266bb4748bed34198e3575bba2dc73"}},
0}, 0},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2},
{<<"mg_proto">>, {<<"mg_proto">>,
{git,"https://github.com/valitydev/machinegun-proto", {git,"https://github.com/valitydev/machinegun-proto",
{ref,"96f7f11b184c29d8b7e83cd7646f3f2c13662bda"}}, {ref,"3decc8f8b13c9cd1701deab47781aacddd7dbc92"}},
1}, 1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2}, {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.3.0">>},2},
{<<"msgpack_proto">>, {<<"msgpack_proto">>,
{git,"https://github.com/valitydev/msgpack-proto.git", {git,"https://github.com/valitydev/msgpack-proto.git",
{ref,"7e447496aa5df4a5f1ace7ef2e3c31248b2a3ed0"}}, {ref,"7e447496aa5df4a5f1ace7ef2e3c31248b2a3ed0"}},
@ -95,7 +95,7 @@
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2},
{<<"scoper">>, {<<"scoper">>,
{git,"https://github.com/valitydev/scoper.git", {git,"https://github.com/valitydev/scoper.git",
{ref,"41a14a558667316998af9f49149ee087ffa8bef2"}}, {ref,"55a2a32ee25e22fa35f583a18eaf38b2b743429b"}},
0}, 0},
{<<"snowflake">>, {<<"snowflake">>,
{git,"https://github.com/valitydev/snowflake.git", {git,"https://github.com/valitydev/snowflake.git",
@ -116,7 +116,7 @@
0}, 0},
{<<"woody">>, {<<"woody">>,
{git,"https://github.com/valitydev/woody_erlang.git", {git,"https://github.com/valitydev/woody_erlang.git",
{ref,"5d46291a6bfcee0bae2a9346a7d927603a909249"}}, {ref,"81219ba5408e1c67f5eaed3c7e566ede42da88d4"}},
0}]}. 0}]}.
[ [
{pkg_hash,[ {pkg_hash,[
@ -135,7 +135,7 @@
{<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>}, {<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>},
{<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>},
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, {<<"mimerl">>, <<"D0CD9FC04B9061F82490F6581E0128379830E78535E017F7780F37FEA7545726">>},
{<<"opentelemetry">>, <<"988AC3C26ACAC9720A1D4FB8D9DC52E95B45ECFEC2D5B5583276A09E8936BC5E">>}, {<<"opentelemetry">>, <<"988AC3C26ACAC9720A1D4FB8D9DC52E95B45ECFEC2D5B5583276A09E8936BC5E">>},
{<<"opentelemetry_api">>, <<"7B69ED4F40025C005DE0B74FCE8C0549625D59CB4DF12D15C32FE6DC5076FF42">>}, {<<"opentelemetry_api">>, <<"7B69ED4F40025C005DE0B74FCE8C0549625D59CB4DF12D15C32FE6DC5076FF42">>},
{<<"opentelemetry_exporter">>, <<"1D8809C0D4F4ACF986405F7700ED11992BCBDB6A4915DD11921E80777FFA7167">>}, {<<"opentelemetry_exporter">>, <<"1D8809C0D4F4ACF986405F7700ED11992BCBDB6A4915DD11921E80777FFA7167">>},
@ -165,7 +165,7 @@
{<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>}, {<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>},
{<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>},
{<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>},
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, {<<"mimerl">>, <<"A1E15A50D1887217DE95F0B9B0793E32853F7C258A5CD227650889B38839FE9D">>},
{<<"opentelemetry">>, <<"8E09EDC26AAD11161509D7ECAD854A3285D88580F93B63B0B1CF0BAC332BFCC0">>}, {<<"opentelemetry">>, <<"8E09EDC26AAD11161509D7ECAD854A3285D88580F93B63B0B1CF0BAC332BFCC0">>},
{<<"opentelemetry_api">>, <<"6D7A27B7CAD2AD69A09CABF6670514CAFCEC717C8441BEB5C96322BAC3D05350">>}, {<<"opentelemetry_api">>, <<"6D7A27B7CAD2AD69A09CABF6670514CAFCEC717C8441BEB5C96322BAC3D05350">>},
{<<"opentelemetry_exporter">>, <<"2B40007F509D38361744882FD060A8841AF772AB83BB542AA5350908B303AD65">>}, {<<"opentelemetry_exporter">>, <<"2B40007F509D38361744882FD060A8841AF772AB83BB542AA5350908B303AD65">>},

View File

@ -13,59 +13,27 @@ namespaces:
# Fistful # Fistful
ff/identity: ff/identity:
event_sinks:
machine:
type: machine
machine_id: ff/identity
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/identity url: http://fistful-server:8022/v1/stateproc/ff/identity
ff/wallet_v2: ff/wallet_v2:
event_sinks:
machine:
type: machine
machine_id: ff/wallet_v2
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/wallet_v2 url: http://fistful-server:8022/v1/stateproc/ff/wallet_v2
ff/source_v1: ff/source_v1:
event_sinks:
machine:
type: machine
machine_id: ff/source_v1
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/source_v1 url: http://fistful-server:8022/v1/stateproc/ff/source_v1
ff/deposit_v1: ff/deposit_v1:
event_sinks:
machine:
type: machine
machine_id: ff/deposit_v1
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/deposit_v1 url: http://fistful-server:8022/v1/stateproc/ff/deposit_v1
ff/destination_v2: ff/destination_v2:
event_sinks:
machine:
type: machine
machine_id: ff/destination_v2
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/destination_v2 url: http://fistful-server:8022/v1/stateproc/ff/destination_v2
ff/withdrawal_v2: ff/withdrawal_v2:
event_sinks:
machine:
type: machine
machine_id: ff/withdrawal_v2
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/withdrawal_v2 url: http://fistful-server:8022/v1/stateproc/ff/withdrawal_v2
ff/withdrawal/session_v2: ff/withdrawal/session_v2:
event_sinks:
machine:
type: machine
machine_id: ff/withdrawal/session_v2
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/withdrawal/session_v2 url: http://fistful-server:8022/v1/stateproc/ff/withdrawal/session_v2
ff/w2w_transfer_v1: ff/w2w_transfer_v1:
event_sinks:
machine:
type: machine
machine_id: ff/w2w_transfer_v1
processor: processor:
url: http://fistful-server:8022/v1/stateproc/ff/w2w_transfer_v1 url: http://fistful-server:8022/v1/stateproc/ff/w2w_transfer_v1
@ -93,8 +61,3 @@ storage:
logging: logging:
out_type: stdout out_type: stdout
level: info level: info
opentelemetry:
exporter:
protocol: grpc
endpoint: http://jaeger:4317