FF-186: Withdrawal thrift support (#254)

* Test schema

* Add whitespaces after commas, remove debug print

* Fix the mess caused by search & replace

* Delete redundant code from ff_withdrawal, handle session_finished

* Delete ct:log

* Remove commented export

* Specify withdrawal version in ff_withdrawal:gen

* Fix formating in tests

* Fix some weird migrations, add binary test

* Fix route thrift struct marshaling

* Clean up

* Masrshal quotes, minor fixes
This commit is contained in:
Toporkov Igor 2020-07-16 12:24:33 +03:00 committed by GitHub
parent f58c4fdc82
commit 036e2a8dcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 754 additions and 341 deletions

View File

@ -72,7 +72,6 @@ unmarshal(T, V, C) when
T =:= {args, init} orelse
T =:= {args, call} orelse
T =:= {args, repair} orelse
T =:= {aux_state, undefined} orelse
T =:= {response, call} orelse
T =:= {response, {repair, success}} orelse
T =:= {response, {repair, failure}}

View File

@ -213,7 +213,7 @@ get_namespace_schema('ff/destination_v2') ->
get_namespace_schema('ff/deposit_v1') ->
ff_deposit_machinery_schema;
get_namespace_schema('ff/withdrawal_v2') ->
machinery_mg_schema_generic;
ff_withdrawal_machinery_schema;
get_namespace_schema('ff/withdrawal/session_v2') ->
machinery_mg_schema_generic;
get_namespace_schema('ff/p2p_transfer_v1') ->

View File

@ -64,8 +64,8 @@ marshal_withdrawal_state(WithdrawalState, Context) ->
effective_final_cash_flow = ff_cash_flow_codec:marshal(final_cash_flow, CashFlow),
adjustments = [ff_withdrawal_adjustment_codec:marshal(adjustment_state, A) || A <- Adjustments],
context = marshal(ctx, Context),
metadata = marshal(ctx, ff_withdrawal:metadata(WithdrawalState))
%% TODO add quote here
metadata = marshal(ctx, ff_withdrawal:metadata(WithdrawalState)),
quote = maybe_marshal(quote, ff_withdrawal:quote(WithdrawalState))
}.
-spec marshal_event(ff_withdrawal_machine:event()) ->
@ -84,6 +84,12 @@ marshal_event({EventID, {ev, Timestamp, Change}}) ->
marshal({list, T}, V) ->
[marshal(T, E) || E <- V];
marshal(timestamped_change, {ev, Timestamp, Change}) ->
#wthd_TimestampedChange{
change = marshal(change, Change),
occured_at = ff_codec:marshal(timestamp, Timestamp)
};
marshal(change, {created, Withdrawal}) ->
{created, #wthd_CreatedChange{withdrawal = marshal(withdrawal, Withdrawal)}};
marshal(change, {status_changed, Status}) ->
@ -117,8 +123,8 @@ marshal(withdrawal, Withdrawal) ->
domain_revision = maybe_marshal(domain_revision, ff_withdrawal:domain_revision(Withdrawal)),
party_revision = maybe_marshal(party_revision, ff_withdrawal:party_revision(Withdrawal)),
created_at = maybe_marshal(timestamp_ms, ff_withdrawal:created_at(Withdrawal)),
metadata = maybe_marshal(ctx, ff_withdrawal:metadata(Withdrawal))
%% TODO add quote here
metadata = maybe_marshal(ctx, ff_withdrawal:metadata(Withdrawal)),
quote = maybe_marshal(quote, ff_withdrawal:quote(Withdrawal))
};
marshal(route, Route) ->
@ -152,6 +158,15 @@ marshal(session_state, Session) ->
result = maybe_marshal(session_result, maps:get(result, Session, undefined))
};
marshal(quote, Quote) ->
#wthd_WithdrawalQuote{
cash_from = marshal(cash, maps:get(cash_from, Quote)),
cash_to = marshal(cash, maps:get(cash_to, Quote)),
created_at = maps:get(created_at, Quote), % already formatted
expires_on = maps:get(expires_on, Quote),
quote_data = marshal(ctx, maps:get(quote_data, Quote))
};
marshal(ctx, Ctx) ->
maybe_marshal(context, Ctx);
@ -165,6 +180,11 @@ marshal(T, V) ->
unmarshal({list, T}, V) ->
[unmarshal(T, E) || E <- V];
unmarshal(timestamped_change, TimestampedChange) ->
Timestamp = ff_codec:unmarshal(timestamp, TimestampedChange#wthd_TimestampedChange.occured_at),
Change = unmarshal(change, TimestampedChange#wthd_TimestampedChange.change),
{ev, Timestamp, Change};
unmarshal(repair_scenario, {add_events, #wthd_AddEventsRepair{events = Events, action = Action}}) ->
{add_events, genlib_map:compact(#{
events => unmarshal({list, change}, Events),
@ -179,7 +199,7 @@ unmarshal(change, {transfer, #wthd_TransferChange{payload = TransferChange}}) ->
{p_transfer, ff_p_transfer_codec:unmarshal(change, TransferChange)};
unmarshal(change, {session, SessionChange}) ->
unmarshal(session_event, SessionChange);
unmarshal(change, {route, Route}) ->
unmarshal(change, {route, #wthd_RouteChange{route = Route}}) ->
{route_changed, unmarshal(route, Route)};
unmarshal(change, {limit_check, #wthd_LimitCheckChange{details = Details}}) ->
{limit_check, ff_limit_check_codec:unmarshal(details, Details)};
@ -197,8 +217,8 @@ unmarshal(withdrawal, Withdrawal = #wthd_Withdrawal{}) ->
body => unmarshal(cash, Withdrawal#wthd_Withdrawal.body),
params => genlib_map:compact(#{
wallet_id => unmarshal(id, Withdrawal#wthd_Withdrawal.wallet_id),
destination_id => unmarshal(id, Withdrawal#wthd_Withdrawal.destination_id)
%% TODO add quote here
destination_id => unmarshal(id, Withdrawal#wthd_Withdrawal.destination_id),
quote => maybe_unmarshal(quote, Withdrawal#wthd_Withdrawal.quote)
}),
route => maybe_unmarshal(route, Withdrawal#wthd_Withdrawal.route),
external_id => maybe_unmarshal(id, Withdrawal#wthd_Withdrawal.external_id),
@ -237,6 +257,15 @@ unmarshal(session_state, Session) ->
result => maybe_unmarshal(session_result, Session#wthd_SessionState.result)
});
unmarshal(quote, Quote) ->
#{
cash_from => unmarshal(cash, Quote#wthd_WithdrawalQuote.cash_from),
cash_to => unmarshal(cash, Quote#wthd_WithdrawalQuote.cash_to),
created_at => Quote#wthd_WithdrawalQuote.created_at,
expires_on => Quote#wthd_WithdrawalQuote.expires_on,
quote_data => unmarshal(ctx, Quote#wthd_WithdrawalQuote.quote_data)
};
unmarshal(ctx, Ctx) ->
maybe_unmarshal(context, Ctx);

View File

@ -41,13 +41,7 @@ publish_event(#{
payload = #wthd_EventSinkPayload{
sequence = marshal(event_id, EventID),
occured_at = marshal(timestamp, EventDt),
changes = [marshal(change, ff_withdrawal:maybe_migrate(
Payload,
#{
timestamp => EventDt,
id => SourceID
}
))]
changes = [marshal(change, Payload)]
}
}.

View File

@ -0,0 +1,714 @@
-module(ff_withdrawal_machinery_schema).
%% Storage schema behaviour
-behaviour(machinery_mg_schema).
-include_lib("fistful_proto/include/ff_proto_p2p_transfer_thrift.hrl").
-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl").
-export([get_version/1]).
-export([marshal/3]).
-export([unmarshal/3]).
%% Constants
% TODO: Replace version to 1 after p2p provider migration
% see https://rbkmoney.atlassian.net/browse/MSPF-561 for details
-define(CURRENT_EVENT_FORMAT_VERSION, undefined).
%% Internal types
-type type() :: machinery_mg_schema:t().
-type value(T) :: machinery_mg_schema:v(T).
-type value_type() :: machinery_mg_schema:vt().
-type event() :: ff_machine:timestamped_event(p2p_transfer:event()).
-type aux_state() :: ff_machine:auxst().
-type call_args() :: term().
-type call_response() :: term().
-type context() :: machinery_mg_schema:context().
-type data() ::
aux_state() |
event() |
call_args() |
call_response().
%% machinery_mg_schema callbacks
-spec get_version(value_type()) ->
machinery_mg_schema:version().
get_version(event) ->
?CURRENT_EVENT_FORMAT_VERSION;
get_version(aux_state) ->
undefined.
-spec marshal(type(), value(data()), context()) ->
{machinery_msgpack:t(), context()}.
marshal({event, Format}, TimestampedChange, Context) ->
marshal_event(Format, TimestampedChange, Context);
marshal(T, V, C) when
T =:= {args, init} orelse
T =:= {args, call} orelse
T =:= {args, repair} orelse
T =:= {aux_state, undefined} orelse
T =:= {response, call} orelse
T =:= {response, {repair, success}} orelse
T =:= {response, {repair, failure}}
->
machinery_mg_schema_generic:marshal(T, V, C).
-spec unmarshal(type(), machinery_msgpack:t(), context()) ->
{data(), context()}.
unmarshal({event, FormatVersion}, EncodedChange, Context) ->
unmarshal_event(FormatVersion, EncodedChange, Context);
unmarshal({aux_state, undefined} = T, V, C0) ->
{AuxState, C1} = machinery_mg_schema_generic:unmarshal(T, V, C0),
{AuxState, C1#{ctx => get_aux_state_ctx(AuxState)}};
unmarshal(T, V, C) when
T =:= {args, init} orelse
T =:= {args, call} orelse
T =:= {args, repair} orelse
T =:= {response, call} orelse
T =:= {response, {repair, success}} orelse
T =:= {response, {repair, failure}}
->
machinery_mg_schema_generic:unmarshal(T, V, C).
%% Internals
-spec marshal_event(machinery_mg_schema:version(), event(), context()) ->
{machinery_msgpack:t(), context()}.
marshal_event(undefined = Version, TimestampedChange, Context) ->
% TODO: Remove this clause after MSPF-561 finish
machinery_mg_schema_generic:marshal({event, Version}, TimestampedChange, Context);
marshal_event(1, TimestampedChange, Context) ->
ThriftChange = ff_withdrawal_codec:marshal(timestamped_change, TimestampedChange),
Type = {struct, struct, {ff_proto_withdrawal_thrift, 'TimestampedChange'}},
{{bin, ff_proto_utils:serialize(Type, ThriftChange)}, Context}.
-spec unmarshal_event(machinery_mg_schema:version(), machinery_msgpack:t(), context()) ->
{event(), context()}.
unmarshal_event(1, EncodedChange, Context) ->
{bin, EncodedThriftChange} = EncodedChange,
Type = {struct, struct, {ff_proto_withdrawal_thrift, 'TimestampedChange'}},
ThriftChange = ff_proto_utils:deserialize(Type, EncodedThriftChange),
{ff_withdrawal_codec:unmarshal(timestamped_change, ThriftChange), Context};
unmarshal_event(undefined = Version, EncodedChange, Context) ->
{{ev, Timestamp, Change}, Context1} = machinery_mg_schema_generic:unmarshal(
{event, Version},
EncodedChange,
Context
),
{{ev, Timestamp, maybe_migrate(Change, Context1)}, Context1}.
maybe_migrate(Ev = {created, #{version := 3}}, _MigrateParams) ->
Ev;
maybe_migrate({route_changed, Route}, _MigrateParams) ->
{route_changed, maybe_migrate_route(Route)};
maybe_migrate({p_transfer, PEvent}, _MigrateParams) ->
{p_transfer, ff_postings_transfer:maybe_migrate(PEvent, withdrawal)};
maybe_migrate({adjustment, _Payload} = Event, _MigrateParams) ->
ff_adjustment_utils:maybe_migrate(Event);
maybe_migrate({resource_got, Resource}, _MigrateParams) ->
{resource_got, ff_instrument:maybe_migrate_resource(Resource)};
% Old events
maybe_migrate({limit_check, {wallet, Details}}, MigrateParams) ->
maybe_migrate({limit_check, {wallet_sender, Details}}, MigrateParams);
maybe_migrate({created, #{version := 1, handler := ff_withdrawal} = T}, MigrateParams) ->
#{
version := 1,
id := ID,
handler := ff_withdrawal,
body := Body,
params := #{
destination := DestinationID,
source := SourceID
}
} = T,
Route = maps:get(route, T, undefined),
maybe_migrate({created, genlib_map:compact(#{
version => 2,
id => ID,
transfer_type => withdrawal,
body => Body,
route => maybe_migrate_route(Route),
params => #{
wallet_id => SourceID,
destination_id => DestinationID,
% Fields below are required to correctly decode legacy events.
% When decoding legacy events, the `erlang:binary_to_existing_atom/2` function is used,
% so the code must contain atoms from the event.
% They are not used now, so their value does not matter.
wallet_account => [],
destination_account => [],
wallet_cash_flow_plan => []
}
})}, MigrateParams);
maybe_migrate({created, Withdrawal = #{version := 2, id := ID}}, MigrateParams) ->
Ctx = maps:get(ctx, MigrateParams, undefined),
Context = case Ctx of
undefined ->
{ok, State} = ff_machine:get(ff_withdrawal, 'ff/withdrawal_v2', ID, {undefined, 0, forward}),
maps:get(ctx, State, undefined);
Data ->
Data
end,
maybe_migrate({created, genlib_map:compact(Withdrawal#{
version => 3,
metadata => ff_entity_context:try_get_legacy_metadata(Context)
})}, MigrateParams);
maybe_migrate({created, T}, MigrateParams) ->
DestinationID = maps:get(destination, T),
SourceID = maps:get(source, T),
ProviderID = maps:get(provider, T),
maybe_migrate({created, T#{
version => 1,
handler => ff_withdrawal,
route => #{provider_id => ProviderID},
params => #{
destination => DestinationID,
source => SourceID
}
}}, MigrateParams);
maybe_migrate({transfer, PTransferEv}, MigrateParams) ->
maybe_migrate({p_transfer, PTransferEv}, MigrateParams);
maybe_migrate({status_changed, {failed, Failure}}, _MigrateParams) when is_map(Failure) ->
{status_changed, {failed, Failure}};
maybe_migrate({status_changed, {failed, LegacyFailure}}, MigrateParams) ->
Failure = #{
code => <<"unknown">>,
reason => genlib:format(LegacyFailure)
},
maybe_migrate({status_changed, {failed, Failure}}, MigrateParams);
maybe_migrate({session_finished, {SessionID, Result}}, _MigrateParams) ->
{session_finished, {SessionID, Result}};
maybe_migrate({session_finished, SessionID}, MigrateParams) ->
{ok, SessionMachine} = ff_withdrawal_session_machine:get(SessionID),
Session = ff_withdrawal_session_machine:session(SessionMachine),
{finished, Result} = ff_withdrawal_session:status(Session),
maybe_migrate({session_finished, {SessionID, Result}}, MigrateParams);
% Other events
maybe_migrate(Ev, _MigrateParams) ->
Ev.
maybe_migrate_route(undefined = Route) ->
Route;
maybe_migrate_route(#{version := 1} = Route) ->
Route;
maybe_migrate_route(Route) when not is_map_key(version, Route) ->
LegacyIDs = #{
<<"mocketbank">> => 1,
<<"royalpay-payout">> => 2,
<<"accentpay">> => 3
},
case maps:get(provider_id, Route) of
ProviderID when is_integer(ProviderID) ->
Route#{
version => 1,
provider_id => ProviderID + 300,
provider_id_legacy => genlib:to_binary(ProviderID)
};
ProviderID when is_binary(ProviderID) andalso is_map_key(ProviderID, LegacyIDs) ->
ModernID = maps:get(ProviderID, LegacyIDs),
Route#{
version => 1,
provider_id => ModernID + 300,
provider_id_legacy => ProviderID
};
ProviderID when is_binary(ProviderID) ->
Route#{
version => 1,
provider_id => erlang:binary_to_integer(ProviderID) + 300,
provider_id_legacy => ProviderID
}
end.
get_aux_state_ctx(AuxState) when is_map(AuxState) ->
maps:get(ctx, AuxState, undefined);
get_aux_state_ctx(_) ->
undefined.
%% Tests
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-spec test() -> _.
% tests helpers
-spec marshal(type(), value(data())) ->
machinery_msgpack:t().
marshal(Type, Value) ->
{Result, _Context} = marshal(Type, Value, #{}),
Result.
-spec unmarshal(type(), machinery_msgpack:t()) ->
data().
unmarshal(Type, Value) ->
{Result, _Context} = unmarshal(Type, Value, #{}),
Result.
-spec v0_created_migration_test() -> _.
v0_created_migration_test() ->
Withdrawal = #{
body => {100, <<"RUB">>},
id => <<"ID">>,
metadata => #{<<"some key">> => <<"some val">>},
params => #{
destination_account => [],
destination_id => <<"destinationID">>,
wallet_account => [],
wallet_cash_flow_plan => [],
wallet_id => <<"sourceID">>
},
route => #{
provider_id => 301,
provider_id_legacy => <<"mocketbank">>,
version => 1
},
transfer_type => withdrawal,
version => 3
},
Change = {created, Withdrawal},
Event = {ev, {{{2020, 5, 25}, {19, 19, 10}}, 293305}, Change},
LegacyChange = {arr, [
{str, <<"tup">>},
{str, <<"created">>},
{arr, [
{str, <<"map">>},
{obj, #{
{str, <<"id">>} => {bin, <<"ID">>},
{str, <<"source">>} => {bin, <<"sourceID">>},
{str, <<"destination">>} => {bin, <<"destinationID">>},
{str, <<"body">>} => {arr, [{str, <<"tup">>}, {i, 100}, {bin, <<"RUB">>}]},
{str, <<"provider">>} => {bin, <<"mocketbank">>}
}}
]}
]},
LegacyEvent = {arr, [
{str, <<"tup">>},
{str, <<"ev">>},
{arr, [
{str, <<"tup">>},
{arr, [
{str, <<"tup">>},
{arr, [{str, <<"tup">>}, {i, 2020}, {i, 5}, {i, 25}]},
{arr, [{str, <<"tup">>}, {i, 19}, {i, 19}, {i, 10}]}
]},
{i, 293305}
]},
LegacyChange
]},
{DecodedLegacy, _} = unmarshal({event, undefined}, LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
ModernizedBinary = marshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, DecodedLegacy),
Decoded = unmarshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, ModernizedBinary),
?assertEqual(Event, Decoded).
-spec v1_created_migration_test() -> _.
v1_created_migration_test() ->
Withdrawal = #{
body => {100, <<"RUB">>},
id => <<"ID">>,
metadata => #{<<"some key">> => <<"some val">>},
params => #{
destination_account => [],
destination_id => <<"destinationID">>,
wallet_account => [],
wallet_cash_flow_plan => [],
wallet_id => <<"walletID">>
},
transfer_type => withdrawal,
version => 3
},
Change = {created, Withdrawal},
Event = {ev, {{{2020, 5, 25}, {19, 19, 10}}, 293305}, Change},
LegacyChange = {arr, [
{str, <<"tup">>},
{str, <<"created">>},
{arr, [
{str, <<"map">>},
{obj, #{
{str, <<"body">>} => {arr, [
{str, <<"tup">>},
{i, 100},
{bin, <<"RUB">>}
]},
{str, <<"destination">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"accounter_account_id">>} => {i, 123},
{str, <<"currency">>} => {bin, <<"RUB">>},
{str, <<"id">>} => {bin, <<"destinationID">>},
{str, <<"identity">>} => {bin, <<"8FkoOxPRjbUXshllJieYV6qIjr3">>}
}}
]},
{str, <<"handler">>} => {str, <<"ff_withdrawal">>},
{str, <<"id">>} => {bin, <<"ID">>},
{str, <<"params">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"destination">>} => {bin, <<"destinationID">>},
{str, <<"source">>} => {bin, <<"walletID">>}
}}
]},
{str, <<"source">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"accounter_account_id">>} => {i, 123},
{str, <<"currency">>} => {bin, <<"RUB">>},
{str, <<"id">>} => {bin, <<"walletID">>},
{str, <<"identity">>} => {bin, <<"Fy3g1eq99fZJBeQDHNPmCNCRu4X">>}
}}
]},
{str, <<"version">>} => {i, 1}
}}
]}
]},
LegacyEvent = {arr, [
{str, <<"tup">>},
{str, <<"ev">>},
{arr, [
{str, <<"tup">>},
{arr, [
{str, <<"tup">>},
{arr, [{str, <<"tup">>}, {i, 2020}, {i, 5}, {i, 25}]},
{arr, [{str, <<"tup">>}, {i, 19}, {i, 19}, {i, 10}]}
]},
{i, 293305}
]},
LegacyChange
]},
{DecodedLegacy, _} = unmarshal({event, undefined}, LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
ModernizedBinary = marshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, DecodedLegacy),
Decoded = unmarshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, ModernizedBinary),
?assertEqual(Event, Decoded).
-spec v2_created_migration_test() -> _.
v2_created_migration_test() ->
Withdrawal = #{
body => {100, <<"RUB">>},
id => <<"ID">>,
metadata => #{<<"some key">> => <<"some val">>},
params => #{
destination_account => #{
accounter_account_id => 123,
currency => <<"RUB">>,
id => <<"destinationID">>,
identity => <<"identity2">>
},
destination_id => <<"destinationID">>,
wallet_account => #{
accounter_account_id => 123,
currency => <<"RUB">>,
id => <<"walletID">>,
identity => <<"identity">>
},
wallet_cash_flow_plan => #{
postings => [#{
receiver => {wallet, receiver_destination},
sender => {wallet, sender_settlement},
volume => {share, {{1, 1}, operation_amount, default}}
}]
},
wallet_id => <<"walletID">>
},
transfer_type => withdrawal,
version => 3
},
Change = {created, Withdrawal},
Event = {ev, {{{2020, 5, 25}, {19, 19, 10}}, 293305}, Change},
LegacyChange = {arr, [
{str, <<"tup">>},
{str, <<"created">>},
{arr, [
{str, <<"map">>},
{obj, #{
{str, <<"body">>} => {arr, [{str, <<"tup">>}, {i, 100}, {bin, <<"RUB">>}]},
{str, <<"id">>} => {bin, <<"ID">>},
{str, <<"params">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"destination_account">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"accounter_account_id">>} => {i, 123},
{str, <<"currency">>} => {bin, <<"RUB">>},
{str, <<"id">>} => {bin, <<"destinationID">>},
{str, <<"identity">>} => {bin, <<"identity2">>}
}}
]},
{str, <<"destination_id">>} => {bin, <<"destinationID">>},
{str, <<"wallet_account">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"accounter_account_id">>} => {i, 123},
{str, <<"currency">>} => {bin, <<"RUB">>},
{str, <<"id">>} => {bin, <<"walletID">>},
{str, <<"identity">>} => {bin, <<"identity">>}
}}
]},
{str, <<"wallet_cash_flow_plan">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"postings">>} => {arr, [
{str, <<"lst">>},
{arr, [
{str, <<"map">>},
{obj, #{
{str, <<"receiver">>} => {arr, [
{str, <<"tup">>},
{str, <<"wallet">>},
{str, <<"receiver_destination">>}
]},
{str, <<"sender">>} => {arr, [
{str, <<"tup">>},
{str, <<"wallet">>},
{str, <<"sender_settlement">>}
]},
{str, <<"volume">>} => {arr, [
{str, <<"tup">>},
{str, <<"share">>},
{arr, [
{str, <<"tup">>},
{arr, [{str, <<"tup">>}, {i, 1}, {i, 1}]},
{str, <<"operation_amount">>},
{str, <<"default">>}
]}
]}
}}
]}
]}
}}
]},
{str, <<"wallet_id">>} => {bin, <<"walletID">>}}
}]
},
{str, <<"transfer_type">>} => {str, <<"withdrawal">>},
{str, <<"version">>} => {i, 2}
}}
]}
]},
LegacyEvent = {arr, [
{str, <<"tup">>},
{str, <<"ev">>},
{arr, [
{str, <<"tup">>},
{arr, [
{str, <<"tup">>},
{arr, [{str, <<"tup">>}, {i, 2020}, {i, 5}, {i, 25}]},
{arr, [{str, <<"tup">>}, {i, 19}, {i, 19}, {i, 10}]}
]},
{i, 293305}
]},
LegacyChange
]},
{DecodedLegacy, _} = unmarshal({event, undefined}, LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
ModernizedBinary = marshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, DecodedLegacy),
Decoded = unmarshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, ModernizedBinary),
?assertEqual(Event, Decoded).
-spec v3_created_migration_test() -> _.
v3_created_migration_test() ->
Withdrawal = #{
body => {100, <<"RUB">>},
id => <<"ID">>,
params => #{
destination_account => #{
accounter_account_id => 123,
currency => <<"RUB">>,
id => <<"destinationID">>,
identity => <<"identity2">>
},
destination_id => <<"destinationID">>,
wallet_account => #{
accounter_account_id => 123,
currency => <<"RUB">>,
id => <<"walletID">>,
identity => <<"identity">>
},
wallet_cash_flow_plan => #{
postings => [#{
receiver => {wallet, receiver_destination},
sender => {wallet, sender_settlement},
volume => {share, {{1, 1}, operation_amount, default}}
}]
},
wallet_id => <<"walletID">>
},
transfer_type => withdrawal,
version => 3
},
Change = {created, Withdrawal},
Event = {ev, {{{2020, 5, 25}, {19, 19, 10}}, 293305}, Change},
LegacyChange = {arr, [
{str, <<"tup">>},
{str, <<"created">>},
{arr, [
{str, <<"map">>},
{obj, #{
{str, <<"body">>} => {arr, [{str, <<"tup">>}, {i, 100}, {bin, <<"RUB">>}]},
{str, <<"id">>} => {bin, <<"ID">>},
{str, <<"params">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"destination_account">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"accounter_account_id">>} => {i, 123},
{str, <<"currency">>} => {bin, <<"RUB">>},
{str, <<"id">>} => {bin, <<"destinationID">>},
{str, <<"identity">>} => {bin, <<"identity2">>}
}}
]},
{str, <<"destination_id">>} => {bin, <<"destinationID">>},
{str, <<"wallet_account">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"accounter_account_id">>} => {i, 123},
{str, <<"currency">>} => {bin, <<"RUB">>},
{str, <<"id">>} => {bin, <<"walletID">>},
{str, <<"identity">>} => {bin, <<"identity">>}
}}
]},
{str, <<"wallet_cash_flow_plan">>} => {arr, [
{str, <<"map">>},
{obj, #{
{str, <<"postings">>} => {arr, [
{str, <<"lst">>},
{arr, [
{str, <<"map">>},
{obj, #{
{str, <<"receiver">>} => {arr, [
{str, <<"tup">>},
{str, <<"wallet">>},
{str, <<"receiver_destination">>}
]},
{str, <<"sender">>} => {arr, [
{str, <<"tup">>},
{str, <<"wallet">>},
{str, <<"sender_settlement">>}
]},
{str, <<"volume">>} => {arr, [
{str, <<"tup">>},
{str, <<"share">>},
{arr, [
{str, <<"tup">>},
{arr, [{str, <<"tup">>}, {i, 1}, {i, 1}]},
{str, <<"operation_amount">>},
{str, <<"default">>}
]}
]}
}}
]}
]}
}}
]},
{str, <<"wallet_id">>} => {bin, <<"walletID">>}
}}
]},
{str, <<"transfer_type">>} => {str, <<"withdrawal">>},
{str, <<"version">>} => {i, 3}
}}
]}
]},
LegacyEvent = {arr, [
{str, <<"tup">>},
{str, <<"ev">>},
{arr, [
{str, <<"tup">>},
{arr, [
{str, <<"tup">>},
{arr, [{str, <<"tup">>}, {i, 2020}, {i, 5}, {i, 25}]},
{arr, [{str, <<"tup">>}, {i, 19}, {i, 19}, {i, 10}]}
]},
{i, 293305}
]},
LegacyChange
]},
{DecodedLegacy, _} = unmarshal({event, undefined}, LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
ModernizedBinary = marshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, DecodedLegacy),
Decoded = unmarshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, ModernizedBinary),
?assertEqual(Event, Decoded).
-spec v0_route_changed_migration_test() -> _.
v0_route_changed_migration_test() ->
LegacyEvent = {route_changed, #{provider_id => 5}},
ModernEvent = {route_changed, #{
version => 1,
provider_id => 305,
provider_id_legacy => <<"5">>
}},
?assertEqual(ModernEvent, maybe_migrate(LegacyEvent, #{})).
-spec created_v3_test() -> _.
created_v3_test() ->
Withdrawal = #{
body => {100, <<"RUB">>},
id => <<"ID">>,
params => #{
destination_id => <<"destinationID">>,
wallet_id => <<"walletID">>
},
transfer_type => withdrawal,
version => 3
},
Change = {created, Withdrawal},
Event = {ev, {{{2020, 5, 25}, {19, 19, 10}}, 293305}, Change},
LegacyEvent = {bin, base64:decode(<<"CwABAAAAGzIwMjAtMDUtMjVUMTk6MTk6MTAuMjkzMzA1WgwAAg",
"wAAQwAAQsABQAAAAJJRAsAAQAAAAh3YWxsZXRJRAsAAgAAAA1kZXN0aW5hdGlvbklEDAADCgABAAAAAAAAA",
"GQMAAILAAEAAAADUlVCAAAAAAAA">>)},
DecodedLegacy = unmarshal({event, 1}, LegacyEvent),
ModernizedBinary = marshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, DecodedLegacy),
Decoded = unmarshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, ModernizedBinary),
?assertEqual(Event, Decoded).
-spec status_changed_marshaling_test() -> _.
status_changed_marshaling_test() ->
Change = {status_changed, {failed, #{code => <<"unknown">>, reason => <<"failure reason">>}}},
Event = {ev, {{{2020, 5, 25}, {19, 19, 10}}, 293305}, Change},
LegacyEvent = {bin, base64:decode(<<"CwABAAAAGzIwMjAtMDUtMjVUMTk6MTk6MTAuMjkzMzA1WgwAAgwAAgwAAQw",
"AAwwAAQsAAQAAAAd1bmtub3duCwACAAAADmZhaWx1cmUgcmVhc29uAAAAAAAA">>)},
DecodedLegacy = unmarshal({event, 1}, LegacyEvent),
ModernizedBinary = marshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, DecodedLegacy),
Decoded = unmarshal({event, ?CURRENT_EVENT_FORMAT_VERSION}, ModernizedBinary),
?assertEqual(Event, Decoded).
-endif.

View File

@ -209,7 +209,6 @@
%% Event source
-export([apply_event/2]).
-export([maybe_migrate/2]).
%% Pipeline
@ -365,7 +364,8 @@ gen(Args) ->
id, transfer_type, body, params, external_id,
domain_revision, party_revision, created_at, route, metadata
],
genlib_map:compact(maps:with(TypeKeys, Args)).
Withdrawal = genlib_map:compact(maps:with(TypeKeys, Args)),
Withdrawal#{version => 3}.
-spec create(params()) ->
{ok, [event()]} |
@ -1552,134 +1552,6 @@ apply_event_({route_changed, Route}, T) ->
apply_event_({adjustment, _Ev} = Event, T) ->
apply_adjustment_event(Event, T).
-spec maybe_migrate(event() | legacy_event(), ff_machine:migrate_params()) ->
event().
% Actual events
maybe_migrate(Ev = {created, #{version := ?ACTUAL_FORMAT_VERSION}}, _MigrateParams) ->
Ev;
maybe_migrate(Ev = {status_changed, {failed, #{code := _}}}, _MigrateParams) ->
Ev;
maybe_migrate(Ev = {session_finished, {_SessionID, _Status}}, _MigrateParams) ->
Ev;
maybe_migrate(Ev = {limit_check, {wallet_sender, _Details}}, _MigrateParams) ->
Ev;
maybe_migrate({route_changed, Route}, _MigrateParams) ->
{route_changed, maybe_migrate_route(Route)};
maybe_migrate({p_transfer, PEvent}, _MigrateParams) ->
{p_transfer, ff_postings_transfer:maybe_migrate(PEvent, withdrawal)};
maybe_migrate({adjustment, _Payload} = Event, _MigrateParams) ->
ff_adjustment_utils:maybe_migrate(Event);
maybe_migrate({resource_got, Resource}, _MigrateParams) ->
{resource_got, ff_instrument:maybe_migrate_resource(Resource)};
% Old events
maybe_migrate({limit_check, {wallet, Details}}, MigrateParams) ->
maybe_migrate({limit_check, {wallet_sender, Details}}, MigrateParams);
maybe_migrate({created, #{version := 1, handler := ff_withdrawal} = T}, MigrateParams) ->
#{
version := 1,
id := ID,
handler := ff_withdrawal,
body := Body,
params := #{
destination := DestinationID,
source := SourceID
}
} = T,
Route = maps:get(route, T, undefined),
maybe_migrate({created, genlib_map:compact(#{
version => 2,
id => ID,
transfer_type => withdrawal,
body => Body,
route => maybe_migrate_route(Route),
params => #{
wallet_id => SourceID,
destination_id => DestinationID,
% Fields below are required to correctly decode legacy events.
% When decoding legacy events, the `erlang:binary_to_existing_atom/2` function is used,
% so the code must contain atoms from the event.
% They are not used now, so their value does not matter.
wallet_account => [],
destination_account => [],
wallet_cash_flow_plan => []
}
})}, MigrateParams);
maybe_migrate({created, Withdrawal = #{version := 2, id := ID}}, MigrateParams) ->
Ctx = maps:get(ctx, MigrateParams, undefined),
Context = case Ctx of
undefined ->
{ok, State} = ff_machine:get(ff_withdrawal, 'ff/withdrawal_v2', ID, {undefined, 0, forward}),
maps:get(ctx, State, undefined);
Data ->
Data
end,
maybe_migrate({created, genlib_map:compact(Withdrawal#{
version => 3,
metadata => ff_entity_context:try_get_legacy_metadata(Context)
})}, MigrateParams);
maybe_migrate({created, T}, MigrateParams) ->
DestinationID = maps:get(destination, T),
SourceID = maps:get(source, T),
ProviderID = maps:get(provider, T),
maybe_migrate({created, T#{
version => 1,
handler => ff_withdrawal,
route => #{provider_id => ProviderID},
params => #{
destination => DestinationID,
source => SourceID
}
}}, MigrateParams);
maybe_migrate({transfer, PTransferEv}, MigrateParams) ->
maybe_migrate({p_transfer, PTransferEv}, MigrateParams);
maybe_migrate({status_changed, {failed, LegacyFailure}}, MigrateParams) ->
Failure = #{
code => <<"unknown">>,
reason => genlib:format(LegacyFailure)
},
maybe_migrate({status_changed, {failed, Failure}}, MigrateParams);
maybe_migrate({session_finished, SessionID}, MigrateParams) ->
{ok, SessionMachine} = ff_withdrawal_session_machine:get(SessionID),
Session = ff_withdrawal_session_machine:session(SessionMachine),
{finished, Result} = ff_withdrawal_session:status(Session),
maybe_migrate({session_finished, {SessionID, Result}}, MigrateParams);
% Other events
maybe_migrate(Ev, _MigrateParams) ->
Ev.
maybe_migrate_route(undefined = Route) ->
Route;
maybe_migrate_route(#{version := 1} = Route) ->
Route;
maybe_migrate_route(Route) when not is_map_key(version, Route) ->
LegacyIDs = #{
<<"mocketbank">> => 1,
<<"royalpay-payout">> => 2,
<<"accentpay">> => 3
},
case maps:get(provider_id, Route) of
ProviderID when is_integer(ProviderID) ->
Route#{
version => 1,
provider_id => ProviderID + 300,
provider_id_legacy => genlib:to_binary(ProviderID)
};
ProviderID when is_binary(ProviderID) andalso is_map_key(ProviderID, LegacyIDs) ->
ModernID = maps:get(ProviderID, LegacyIDs),
Route#{
version => 1,
provider_id => ModernID + 300,
provider_id_legacy => ProviderID
};
ProviderID when is_binary(ProviderID) ->
Route#{
version => 1,
provider_id => erlang:binary_to_integer(ProviderID) + 300,
provider_id_legacy => ProviderID
}
end.
get_attempt_limit(Withdrawal) ->
#{
body := Body,
@ -1720,198 +1592,3 @@ get_attempt_limit_(undefined) ->
1;
get_attempt_limit_({value, Limit}) ->
ff_dmsl_codec:unmarshal(attempt_limit, Limit).
%% Tests
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-spec test() -> _.
-spec v0_created_migration_test() -> _.
v0_created_migration_test() ->
ID = genlib:unique(),
WalletID = genlib:unique(),
DestinationID = genlib:unique(),
ProviderID = <<"mocketbank">>,
Body = {100, <<"RUB">>},
LegacyEvent = {created, #{
id => ID,
source => WalletID,
destination => DestinationID,
body => Body,
provider => ProviderID
}},
{created, Withdrawal} = maybe_migrate(LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
?assertEqual(ID, id(Withdrawal)),
?assertEqual(WalletID, wallet_id(Withdrawal)),
?assertEqual(DestinationID, destination_id(Withdrawal)),
?assertEqual(Body, body(Withdrawal)),
?assertEqual(#{version => 1, provider_id => 301, provider_id_legacy => <<"mocketbank">>}, route(Withdrawal)).
-spec v1_created_migration_test() -> _.
v1_created_migration_test() ->
ID = genlib:unique(),
WalletID = genlib:unique(),
WalletAccount = #{
id => WalletID,
identity => genlib:unique(),
currency => <<"RUB">>,
accounter_account_id => 123
},
DestinationID = genlib:unique(),
DestinationAccount = #{
id => DestinationID,
identity => genlib:unique(),
currency => <<"RUB">>,
accounter_account_id => 123
},
Body = {100, <<"RUB">>},
LegacyEvent = {created, #{
version => 1,
id => ID,
handler => ff_withdrawal,
source => WalletAccount,
destination => DestinationAccount,
body => Body,
params => #{
source => WalletID,
destination => DestinationID
}
}},
{created, Withdrawal} = maybe_migrate(LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
?assertEqual(ID, id(Withdrawal)),
?assertEqual(WalletID, wallet_id(Withdrawal)),
?assertEqual(DestinationID, destination_id(Withdrawal)),
?assertEqual(Body, body(Withdrawal)).
-spec v2_created_migration_test() -> _.
v2_created_migration_test() ->
ID = genlib:unique(),
WalletID = genlib:unique(),
WalletAccount = #{
id => WalletID,
identity => genlib:unique(),
currency => <<"RUB">>,
accounter_account_id => 123
},
DestinationID = genlib:unique(),
DestinationAccount = #{
id => DestinationID,
identity => genlib:unique(),
currency => <<"RUB">>,
accounter_account_id => 123
},
Body = {100, <<"RUB">>},
LegacyEvent = {created, #{
version => 2,
id => ID,
transfer_type => withdrawal,
body => Body,
params => #{
wallet_id => WalletID,
destination_id => DestinationID,
wallet_account => WalletAccount,
destination_account => DestinationAccount,
wallet_cash_flow_plan => #{
postings => [
#{
sender => {wallet, sender_settlement},
receiver => {wallet, receiver_destination},
volume => {share, {{1, 1}, operation_amount, default}}
}
]
}
}
}},
{created, Withdrawal} = maybe_migrate(LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
?assertEqual(ID, id(Withdrawal)),
?assertEqual(WalletID, wallet_id(Withdrawal)),
?assertEqual(DestinationID, destination_id(Withdrawal)),
?assertEqual(Body, body(Withdrawal)).
-spec v3_created_migration_test() -> _.
v3_created_migration_test() ->
ID = genlib:unique(),
WalletID = genlib:unique(),
WalletAccount = #{
id => WalletID,
identity => genlib:unique(),
currency => <<"RUB">>,
accounter_account_id => 123
},
DestinationID = genlib:unique(),
DestinationAccount = #{
id => DestinationID,
identity => genlib:unique(),
currency => <<"RUB">>,
accounter_account_id => 123
},
Body = {100, <<"RUB">>},
LegacyEvent = {created, #{
version => 2,
id => ID,
transfer_type => withdrawal,
body => Body,
params => #{
wallet_id => WalletID,
destination_id => DestinationID,
wallet_account => WalletAccount,
destination_account => DestinationAccount,
wallet_cash_flow_plan => #{
postings => [
#{
sender => {wallet, sender_settlement},
receiver => {wallet, receiver_destination},
volume => {share, {{1, 1}, operation_amount, default}}
}
]
}
}
}},
{created, Withdrawal} = maybe_migrate(LegacyEvent, #{
ctx => #{
<<"com.rbkmoney.wapi">> => #{
<<"metadata">> => #{
<<"some key">> => <<"some val">>
}
}
}
}),
?assertEqual(ID, id(Withdrawal)),
?assertEqual(#{<<"some key">> => <<"some val">>}, metadata(Withdrawal)).
-spec v0_route_changed_migration_test() -> _.
v0_route_changed_migration_test() ->
LegacyEvent = {route_changed, #{provider_id => 5}},
ModernEvent = {route_changed, #{
version => 1,
provider_id => 305,
provider_id_legacy => <<"5">>
}},
?assertEqual(ModernEvent, maybe_migrate(LegacyEvent, #{})).
-endif.