FF-65: Set start event (#66)

This commit is contained in:
Артем 2019-02-21 11:05:34 +03:00 committed by GitHub
parent 9611e9cb5d
commit c6bc6368ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 16 deletions

View File

@ -177,29 +177,30 @@ get_wallet_routes() ->
get_eventsink_routes(BeConf) -> get_eventsink_routes(BeConf) ->
IdentityRoute = create_sink_route({<<"/v1/eventsink/identity">>, IdentityRoute = create_sink_route({<<"/v1/eventsink/identity">>,
{{ff_proto_identity_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_identity_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ns => <<"ff/identity">>, publisher => ff_identity_eventsink_publisher}}}}), make_sink_handler_cfg(<<"ff/identity">>, ff_identity_eventsink_publisher, BeConf)}}}),
WalletRoute = create_sink_route({<<"/v1/eventsink/wallet">>, WalletRoute = create_sink_route({<<"/v1/eventsink/wallet">>,
{{ff_proto_wallet_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_wallet_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ns => <<"ff/wallet_v2">>, publisher => ff_wallet_eventsink_publisher}}}}), make_sink_handler_cfg(<<"ff/wallet_v2">>, ff_wallet_eventsink_publisher, BeConf)}}}),
WithdrawalSessionRoute = create_sink_route({<<"/v1/eventsink/withdrawal/session">>, WithdrawalSessionRoute = create_sink_route({<<"/v1/eventsink/withdrawal/session">>,
{{ff_proto_withdrawal_session_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_withdrawal_session_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ make_sink_handler_cfg(
ns => <<"ff/withdrawal/session_v2">>, <<"ff/withdrawal/session_v2">>,
publisher => ff_withdrawal_session_eventsink_publisher ff_withdrawal_session_eventsink_publisher,
} BeConf
)
}}}), }}}),
WithdrawalRoute = create_sink_route({<<"/v1/eventsink/withdrawal">>, WithdrawalRoute = create_sink_route({<<"/v1/eventsink/withdrawal">>,
{{ff_proto_withdrawal_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_withdrawal_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ns => <<"ff/withdrawal_v2">>, publisher => ff_withdrawal_eventsink_publisher}}}}), make_sink_handler_cfg(<<"ff/withdrawal_v2">>, ff_withdrawal_eventsink_publisher, BeConf)}}}),
DestinationRoute = create_sink_route({<<"/v1/eventsink/destination">>, DestinationRoute = create_sink_route({<<"/v1/eventsink/destination">>,
{{ff_proto_destination_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_destination_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ns => <<"ff/destination_v2">>, publisher => ff_destination_eventsink_publisher}}}}), make_sink_handler_cfg(<<"ff/destination_v2">>, ff_destination_eventsink_publisher, BeConf)}}}),
SourceRoute = create_sink_route({<<"/v1/eventsink/source">>, SourceRoute = create_sink_route({<<"/v1/eventsink/source">>,
{{ff_proto_source_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_source_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ns => <<"ff/source_v1">>, publisher => ff_source_eventsink_publisher}}}}), make_sink_handler_cfg(<<"ff/source_v1">>, ff_source_eventsink_publisher, BeConf)}}}),
DepositRoute = create_sink_route({<<"/v1/eventsink/deposit">>, DepositRoute = create_sink_route({<<"/v1/eventsink/deposit">>,
{{ff_proto_deposit_thrift, 'EventSink'}, {ff_eventsink_handler, {{ff_proto_deposit_thrift, 'EventSink'}, {ff_eventsink_handler,
BeConf#{ns => <<"ff/deposit_v1">>, publisher => ff_deposit_eventsink_publisher}}}}), make_sink_handler_cfg(<<"ff/deposit_v1">>, ff_deposit_eventsink_publisher, BeConf)}}}),
lists:flatten([ lists:flatten([
IdentityRoute, IdentityRoute,
WalletRoute, WalletRoute,
@ -281,6 +282,13 @@ create_sink_route({Path, {Module, {Handler, Cfg}}}) ->
event_handler => scoper_woody_event_handler event_handler => scoper_woody_event_handler
})). })).
make_sink_handler_cfg(NS, Publisher, Cfg) ->
Cfg#{
ns => NS,
publisher => Publisher,
start_event => 0
}.
%% Default options %% Default options
machinery_backend_config(Options) -> machinery_backend_config(Options) ->

View File

@ -32,9 +32,15 @@ handle_function(Func, Args, Context, Opts) ->
). ).
handle_function_( handle_function_(
'GetEvents', [#'evsink_EventRange'{'after' = After, limit = Limit}], 'GetEvents', [#'evsink_EventRange'{'after' = After0, limit = Limit}],
Context, #{schema := Schema, client := Client, ns := NS, publisher := Publisher} Context, #{
schema := Schema,
client := Client,
ns := NS,
publisher := Publisher,
start_event := StartEvent}
) -> ) ->
After = erlang:max(After0, StartEvent),
{ok, Events} = machinery_mg_eventsink:get_events(NS, After, Limit, {ok, Events} = machinery_mg_eventsink:get_events(NS, After, Limit,
#{client => {Client, Context}, schema => Schema}), #{client => {Client, Context}, schema => Schema}),
ff_eventsink_publisher:publish_events(Events, #{publisher => Publisher}); ff_eventsink_publisher:publish_events(Events, #{publisher => Publisher});

View File

@ -182,12 +182,13 @@ get_eventsink_route(RouteType, {DefPath, {Module, {Publisher, Cfg}}}) ->
Opts -> Opts ->
Path = maps:get(path, Opts, DefPath), Path = maps:get(path, Opts, DefPath),
NS = maps:get(namespace, Opts), NS = maps:get(namespace, Opts),
StartEvent = maps:get(start_event, Opts, 0),
Limits = genlib_map:get(handler_limits, Opts), Limits = genlib_map:get(handler_limits, Opts),
woody_server_thrift_http_handler:get_routes(genlib_map:compact(#{ woody_server_thrift_http_handler:get_routes(genlib_map:compact(#{
handlers => [ handlers => [
{Path, {Module, { {Path, {Module, {
ff_eventsink_handler, ff_eventsink_handler,
Cfg#{ns => NS, publisher => Publisher} Cfg#{ns => NS, publisher => Publisher, start_event => StartEvent}
}}}], }}}],
event_handler => scoper_woody_event_handler, event_handler => scoper_woody_event_handler,
handler_limits => Limits handler_limits => Limits

View File

@ -26,6 +26,7 @@
-export([get_create_destination_events_ok/1]). -export([get_create_destination_events_ok/1]).
-export([get_create_source_events_ok/1]). -export([get_create_source_events_ok/1]).
-export([get_create_deposit_events_ok/1]). -export([get_create_deposit_events_ok/1]).
-export([get_shifted_create_identity_events_ok/1]).
-type config() :: ct_helper:config(). -type config() :: ct_helper:config().
-type test_case_name() :: ct_helper:test_case_name(). -type test_case_name() :: ct_helper:test_case_name().
@ -47,7 +48,8 @@ all() ->
get_create_destination_events_ok, get_create_destination_events_ok,
get_create_source_events_ok, get_create_source_events_ok,
get_create_deposit_events_ok, get_create_deposit_events_ok,
get_withdrawal_session_events_ok get_withdrawal_session_events_ok,
get_shifted_create_identity_events_ok
]. ].
-spec groups() -> [{group_name(), list(), [test_case_name()]}]. -spec groups() -> [{group_name(), list(), [test_case_name()]}].
@ -269,6 +271,35 @@ get_create_deposit_events_ok(C) ->
MaxID = get_max_sinkevent_id(Events), MaxID = get_max_sinkevent_id(Events),
MaxID = LastEvent + length(RawEvents). MaxID = LastEvent + length(RawEvents).
-spec get_shifted_create_identity_events_ok(config()) -> test_return().
get_shifted_create_identity_events_ok(C) ->
#{suite_sup := SuiteSup} = ct_helper:cfg(payment_system, C),
Service = {{ff_proto_identity_thrift, 'EventSink'}, <<"/v1/eventsink/identity">>},
StartEventNum = 3,
IdentityRoute = create_sink_route({<<"/v1/eventsink/identity">>,
{{ff_proto_identity_thrift, 'EventSink'}, {ff_eventsink_handler,
#{
ns => <<"ff/identity">>,
publisher => ff_identity_eventsink_publisher,
start_event => StartEventNum,
schema => machinery_mg_schema_generic
}}}}),
{ok, _} = supervisor:start_child(SuiteSup, woody_server:child_spec(
?MODULE,
#{
ip => {0, 0, 0, 0},
port => 8040,
handlers => [],
event_handler => scoper_woody_event_handler,
additional_routes => IdentityRoute
}
)),
{ok, Events} = call_route_handler('GetEvents',
Service, [#'evsink_EventRange'{'after' = 0, limit = 1}]),
MaxID = get_max_sinkevent_id(Events),
MaxID = StartEventNum + 1.
create_identity(Party, C) -> create_identity(Party, C) ->
create_identity(Party, <<"good-one">>, <<"person">>, C). create_identity(Party, <<"good-one">>, <<"person">>, C).
@ -410,10 +441,27 @@ unwrap_last_sinkevent_id({exception, #'evsink_NoLastEvent'{}}) ->
{ok, woody:result()} | {ok, woody:result()} |
{exception, woody_error:business_error()}. {exception, woody_error:business_error()}.
call_eventsink_handler(Function, {Service, Path}, Args) -> call_eventsink_handler(Function, Service, Args) ->
call_handler(Function, Service, Args, <<"8022">>).
call_route_handler(Function, Service, Args) ->
call_handler(Function, Service, Args, <<"8040">>).
call_handler(Function, {Service, Path}, Args, Port) ->
Request = {Service, Function, Args}, Request = {Service, Function, Args},
Client = ff_woody_client:new(#{ Client = ff_woody_client:new(#{
url => <<"http://localhost:8022", Path/binary>>, url => <<"http://localhost:", Port/binary, Path/binary>>,
event_handler => scoper_woody_event_handler event_handler => scoper_woody_event_handler
}), }),
ff_woody_client:call(Client, Request). ff_woody_client:call(Client, Request).
create_sink_route({Path, {Module, {Handler, Cfg}}}) ->
NewCfg = Cfg#{
client => #{
event_handler => scoper_woody_event_handler,
url => "http://machinegun:8022/v1/event_sink"
}},
woody_server_thrift_http_handler:get_routes(genlib_map:compact(#{
handlers => [{Path, {Module, {Handler, NewCfg}}}],
event_handler => scoper_woody_event_handler
})).