[WIP] Add cancel op

This commit is contained in:
Andrey Mayorov 2018-06-14 22:14:03 +03:00
parent afa14335c2
commit f7a35e7612
2 changed files with 84 additions and 18 deletions

View File

@ -20,21 +20,28 @@
-export([prepare/2]). -export([prepare/2]).
-export([commit/2]). -export([commit/2]).
-export([cancel/2]).
%% %%
-spec prepare(id(), [posting()]) -> -spec prepare(id(), [posting()]) ->
affected(). {ok, affected()}.
prepare(ID, Postings) -> prepare(ID, Postings) ->
hold(encode_plan_change(ID, Postings)). hold(encode_plan_change(ID, Postings)).
-spec commit(id(), [posting()]) -> -spec commit(id(), [posting()]) ->
affected(). {ok, affected()}.
commit(ID, Postings) -> commit(ID, Postings) ->
commit_plan(encode_plan(ID, Postings)). commit_plan(encode_plan(ID, Postings)).
-spec cancel(id(), [posting()]) ->
{ok, affected()}.
cancel(ID, Postings) ->
rollback_plan(encode_plan(ID, Postings)).
%% Woody stuff %% Woody stuff
hold(PlanChange) -> hold(PlanChange) ->
@ -53,6 +60,14 @@ commit_plan(Plan) ->
error(Unexpected) error(Unexpected)
end. end.
rollback_plan(Plan) ->
case call('RollbackPlan', [Plan]) of
{ok, #accounter_PostingPlanLog{affected_accounts = Affected}} ->
{ok, decode_affected(Affected)};
{error, Unexpected} ->
error(Unexpected)
end.
call(Function, Args) -> call(Function, Args) ->
Service = {dmsl_accounter_thrift, 'Accounter'}, Service = {dmsl_accounter_thrift, 'Accounter'},
ff_woody_client:call(accounter, {Service, Function, Args}). ff_woody_client:call(accounter, {Service, Function, Args}).

View File

@ -31,6 +31,12 @@
status := status() status := status()
}. }.
-type ev() ::
{status_changed, status()}.
-type outcome() ::
{[ev()], transfer()}.
-export_type([transfer/0]). -export_type([transfer/0]).
-export_type([posting/0]). -export_type([posting/0]).
-export_type([status/0]). -export_type([status/0]).
@ -42,6 +48,11 @@
-export([create/2]). -export([create/2]).
-export([prepare/1]). -export([prepare/1]).
-export([commit/1]). -export([commit/1]).
-export([cancel/1]).
%% Event source
-export([apply_event/2]).
%% Pipeline %% Pipeline
@ -116,29 +127,31 @@ validate_identities([W0 | Wallets]) ->
%% %%
-spec prepare(transfer()) -> -spec prepare(transfer()) ->
{ok, transfer()} | {ok, outcome()} |
{error, {error,
status() |
balance | balance |
{status, committed | cancelled} |
{wallet, {inaccessible, blocked | suspended}} {wallet, {inaccessible, blocked | suspended}}
}. }.
prepare(Transfer = #{status := created}) -> prepare(Transfer = #{status := created}) ->
do(fun () -> TrxID = trxid(Transfer),
Postings = postings(Transfer), Postings = construct_trx_postings(postings(Transfer)),
roll(Transfer, do(fun () ->
accessible = unwrap(wallet, validate_accessible(gather_wallets(Postings))), accessible = unwrap(wallet, validate_accessible(gather_wallets(Postings))),
Affected = ff_transaction:prepare(trxid(Transfer), construct_trx_postings(Postings)), Affected = unwrap(ff_transaction:prepare(TrxID, Postings)),
case validate_balances(Affected) of case validate_balances(Affected) of
{ok, valid} -> {ok, valid} ->
Transfer#{status := prepared}; [{status_changed, prepared}];
{error, invalid} -> {error, invalid} ->
_ = ff_transaction:cancel(TrxID, Postings),
throw(balance) throw(balance)
end end
end); end));
prepare(Transfer = #{status := prepared}) -> prepare(Transfer = #{status := prepared}) ->
{ok, Transfer}; {ok, Transfer};
prepare(#{status := Status}) -> prepare(#{status := Status}) ->
{error, Status}. {error, {status, Status}}.
validate_balances(Affected) -> validate_balances(Affected) ->
% TODO % TODO
@ -147,22 +160,60 @@ validate_balances(Affected) ->
%% %%
-spec commit(transfer()) -> -spec commit(transfer()) ->
{ok, transfer()} | {ok, outcome()} |
{error, status()}. {error, {status, created | cancelled}}.
commit(Transfer = #{status := prepared}) -> commit(Transfer = #{status := prepared}) ->
do(fun () -> roll(Transfer, do(fun () ->
Postings = postings(Transfer), Postings = construct_trx_postings(postings(Transfer)),
_Affected = ff_transaction:commit(trxid(Transfer), construct_trx_postings(Postings)), _Affected = unwrap(ff_transaction:commit(trxid(Transfer), Postings)),
Transfer#{status := committed} [{status_changed, committed}]
end); end));
commit(Transfer = #{status := committed}) -> commit(Transfer = #{status := committed}) ->
{ok, Transfer}; {ok, roll(Transfer)};
commit(#{status := Status}) -> commit(#{status := Status}) ->
{error, Status}. {error, Status}.
%% %%
-spec cancel(transfer()) ->
{ok, outcome()} |
{error, {status, created | committed}}.
cancel(Transfer = #{status := prepared}) ->
roll(Transfer, do(fun () ->
Postings = construct_trx_postings(postings(Transfer)),
_Affected = unwrap(ff_transaction:cancel(trxid(Transfer), Postings)),
[{status_changed, cancelled}]
end));
cancel(Transfer = #{status := cancelled}) ->
{ok, roll(Transfer)};
cancel(#{status := Status}) ->
{error, {status, Status}}.
%%
apply_event({status_changed, S}, Transfer) ->
Transfer#{status := S}.
%% TODO
-type result(V, R) :: {ok, V} | {error, R}.
-spec roll(St, result(Evs, Reason)) ->
result({Evs, St}, Reason) when
Evs :: [_].
roll(St, {ok, Events}) when is_list(Events) ->
{ok, {Events, lists:foldl(fun apply_event/2, St, Events)}};
roll(_St, {error, _} = Error) ->
Error.
roll(St) ->
{[], St}.
%%
construct_trx_postings(Postings) -> construct_trx_postings(Postings) ->
[ [
{ff_wallet:account(Source), ff_wallet:account(Destination), Body} || {ff_wallet:account(Source), ff_wallet:account(Destination), Body} ||