mirror of
https://github.com/valitydev/hellgate.git
synced 2024-11-06 10:55:22 +00:00
HG-408: switch to binary event storage (#242)
* HG-408: switch to binary event storage * use metadata obviously * encode/decode perfection * dumb packing logic * dumb unpacking * fix dialyzaaaar * little typing improvement * introduce primitive AuxState * use aux state for state composition * rework aux state usage a little bit * use AuxState for checking out by revision * fix awkward bug * move header to proper location * revert previous ugly commit Revert "fix awkward bug" * use 0 limit for get_state and checkout_by_revision * rework get_state for handle_call * build party revision index on call * remove adhoc with domain revision * remove get_st_timestamp()
This commit is contained in:
parent
277d6e4946
commit
4485efae95
@ -19,8 +19,8 @@
|
||||
-behaviour(hg_machine).
|
||||
-export([namespace /0]).
|
||||
-export([init /2]).
|
||||
-export([process_signal/3]).
|
||||
-export([process_call /3]).
|
||||
-export([process_signal/2]).
|
||||
-export([process_call /2]).
|
||||
|
||||
%% Event provider callbacks
|
||||
|
||||
@ -189,9 +189,9 @@ publish_event(CustomerID, Changes) when is_list(Changes) ->
|
||||
namespace() ->
|
||||
?NS.
|
||||
|
||||
-spec init(customer_id(), customer_params()) ->
|
||||
-spec init(customer_params(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
init(CustomerID, CustomerParams) ->
|
||||
init(CustomerParams, #{id := CustomerID}) ->
|
||||
handle_result(#{
|
||||
changes => [
|
||||
get_customer_created_event(CustomerID, CustomerParams)
|
||||
@ -199,9 +199,9 @@ init(CustomerID, CustomerParams) ->
|
||||
auxst => #{}
|
||||
}).
|
||||
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
process_signal(Signal, History, AuxSt) ->
|
||||
process_signal(Signal, #{history := History, aux_state := AuxSt}) ->
|
||||
handle_result(handle_signal(Signal, collapse_history(unmarshal(History)), unmarshal(auxst, AuxSt))).
|
||||
|
||||
handle_signal(timeout, St0, AuxSt0) ->
|
||||
@ -256,9 +256,9 @@ is_binding_succeeded(_) ->
|
||||
{start_binding, binding_params()} |
|
||||
delete.
|
||||
|
||||
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_call(call(), hg_machine:machine()) ->
|
||||
{hg_machine:response(), hg_machine:result()}.
|
||||
process_call(Call, History, _AuxSt) ->
|
||||
process_call(Call, #{history := History}) ->
|
||||
St = collapse_history(unmarshal(History)),
|
||||
try handle_result(handle_call(Call, St)) catch
|
||||
throw:Exception ->
|
||||
|
@ -38,8 +38,8 @@
|
||||
-export([namespace/0]).
|
||||
|
||||
-export([init/2]).
|
||||
-export([process_signal/3]).
|
||||
-export([process_call/3]).
|
||||
-export([process_signal/2]).
|
||||
-export([process_call/2]).
|
||||
|
||||
%% Event provider callbacks
|
||||
|
||||
@ -402,10 +402,10 @@ publish_event(InvoiceID, Changes) ->
|
||||
namespace() ->
|
||||
?NS.
|
||||
|
||||
-spec init(invoice_id(), [invoice_tpl_id() | invoice_params()]) ->
|
||||
-spec init([invoice_tpl_id() | invoice_params()], hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
|
||||
init(ID, [InvoiceTplID, PartyRevision, InvoiceParams]) ->
|
||||
init([InvoiceTplID, PartyRevision, InvoiceParams], #{id := ID}) ->
|
||||
Invoice = create_invoice(ID, InvoiceTplID, PartyRevision, InvoiceParams),
|
||||
% TODO ugly, better to roll state and events simultaneously, hg_party-like
|
||||
handle_result(#{
|
||||
@ -416,10 +416,10 @@ init(ID, [InvoiceTplID, PartyRevision, InvoiceParams]) ->
|
||||
|
||||
%%
|
||||
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
|
||||
process_signal(Signal, History, _AuxSt) ->
|
||||
process_signal(Signal, #{history := History}) ->
|
||||
handle_result(handle_signal(Signal, collapse_history(unmarshal(History)))).
|
||||
|
||||
handle_signal(timeout, St = #st{activity = {payment, PaymentID}}) ->
|
||||
@ -475,10 +475,10 @@ handle_expiration(St) ->
|
||||
{rescind, binary()} |
|
||||
{callback, callback()}.
|
||||
|
||||
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_call(call(), hg_machine:machine()) ->
|
||||
{hg_machine:response(), hg_machine:result()}.
|
||||
|
||||
process_call(Call, History, _AuxSt) ->
|
||||
process_call(Call, #{history := History}) ->
|
||||
St = collapse_history(unmarshal(History)),
|
||||
try handle_result(handle_call(Call, St)) catch
|
||||
throw:Exception ->
|
||||
|
@ -17,8 +17,8 @@
|
||||
-export([namespace/0]).
|
||||
|
||||
-export([init/2]).
|
||||
-export([process_signal/3]).
|
||||
-export([process_call/3]).
|
||||
-export([process_signal/2]).
|
||||
-export([process_call/2]).
|
||||
|
||||
%% Event provider callbacks
|
||||
|
||||
@ -216,10 +216,10 @@ assert_invoice_template_not_deleted(_) ->
|
||||
namespace() ->
|
||||
?NS.
|
||||
|
||||
-spec init(tpl_id(), create_params()) ->
|
||||
-spec init(create_params(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
|
||||
init(ID, Params) ->
|
||||
init(Params, #{id := ID}) ->
|
||||
Tpl = create_invoice_template(ID, Params),
|
||||
#{events => [marshal([?tpl_created(Tpl)])]}.
|
||||
|
||||
@ -235,19 +235,19 @@ create_invoice_template(ID, P) ->
|
||||
context = P#payproc_InvoiceTemplateCreateParams.context
|
||||
}.
|
||||
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
|
||||
process_signal(timeout, _History, _AuxSt) ->
|
||||
process_signal(timeout, _Machine) ->
|
||||
#{};
|
||||
|
||||
process_signal({repair, _}, _History, _AuxSt) ->
|
||||
process_signal({repair, _}, _Machine) ->
|
||||
#{}.
|
||||
|
||||
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_call(call(), hg_machine:machine()) ->
|
||||
{hg_machine:response(), hg_machine:result()}.
|
||||
|
||||
process_call(Call, History, _AuxSt) ->
|
||||
process_call(Call, #{history := History}) ->
|
||||
Tpl = collapse_history(unmarshal(History)),
|
||||
{Response, Changes} = handle_call(Call, Tpl),
|
||||
{{ok, Response}, #{events => [marshal(Changes)]}}.
|
||||
|
@ -1,6 +1,6 @@
|
||||
-module(hg_machine).
|
||||
|
||||
-type msgp() :: hg_msgpack_marshalling:value().
|
||||
-type msgp() :: hg_msgpack_marshalling:msgpack_value().
|
||||
|
||||
-type id() :: mg_proto_base_thrift:'ID'().
|
||||
-type tag() :: {tag, mg_proto_base_thrift:'Tag'()}.
|
||||
@ -8,8 +8,6 @@
|
||||
-type ns() :: mg_proto_base_thrift:'Namespace'().
|
||||
-type args() :: _.
|
||||
|
||||
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
|
||||
|
||||
-type event() :: {event_id(), timestamp(), event_payload()}.
|
||||
-type event_id() :: mg_proto_base_thrift:'EventID'().
|
||||
-type event_payload() :: msgp().
|
||||
@ -21,6 +19,12 @@
|
||||
-type direction() :: mg_proto_state_processing_thrift:'Direction'().
|
||||
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
|
||||
|
||||
-type machine() :: #{
|
||||
id := id(),
|
||||
history := history(),
|
||||
aux_state := auxst()
|
||||
}.
|
||||
|
||||
-type result() :: #{
|
||||
events => [event_payload()],
|
||||
action => hg_machine_action:t(),
|
||||
@ -30,19 +34,19 @@
|
||||
-callback namespace() ->
|
||||
ns().
|
||||
|
||||
-callback init(id(), args()) ->
|
||||
-callback init(args(), machine()) ->
|
||||
result().
|
||||
|
||||
-type signal() ::
|
||||
timeout | {repair, args()}.
|
||||
|
||||
-callback process_signal(signal(), history(), auxst()) ->
|
||||
-callback process_signal(signal(), machine()) ->
|
||||
result().
|
||||
|
||||
-type call() :: _.
|
||||
-type response() :: ok | {ok, term()} | {exception, term()}.
|
||||
|
||||
-callback process_call(call(), history(), auxst()) ->
|
||||
-callback process_call(call(), machine()) ->
|
||||
{response(), result()}.
|
||||
|
||||
-type context() :: #{
|
||||
@ -62,13 +66,16 @@
|
||||
-export_type([result/0]).
|
||||
-export_type([context/0]).
|
||||
-export_type([response/0]).
|
||||
-export_type([machine/0]).
|
||||
|
||||
-export([start/3]).
|
||||
-export([call/3]).
|
||||
-export([call/6]).
|
||||
-export([repair/3]).
|
||||
-export([get_history/2]).
|
||||
-export([get_history/4]).
|
||||
-export([get_history/5]).
|
||||
-export([get_machine/5]).
|
||||
|
||||
%% Dispatch
|
||||
|
||||
@ -98,11 +105,29 @@
|
||||
start(Ns, ID, Args) ->
|
||||
call_automaton('Start', [Ns, ID, wrap_args(Args)]).
|
||||
|
||||
-spec call(ns(), ref(), term()) ->
|
||||
-spec call(ns(), ref(), Args :: term()) ->
|
||||
{ok, term()} | {error, notfound | failed} | no_return().
|
||||
|
||||
call(Ns, Ref, Args) ->
|
||||
Descriptor = prepare_descriptor(Ns, Ref, #'HistoryRange'{}),
|
||||
call(Ns, Ref, Args, undefined, undefined, forward).
|
||||
|
||||
-spec call(
|
||||
ns(),
|
||||
ref(),
|
||||
Args :: term(),
|
||||
After :: event_id() | undefined,
|
||||
Limit :: integer() | undefined,
|
||||
Direction :: forward | backward
|
||||
) ->
|
||||
{ok, term()} | {error, notfound | failed} | no_return().
|
||||
|
||||
call(Ns, Ref, Args, After, Limit, Direction) ->
|
||||
HistoryRange = #'HistoryRange'{
|
||||
'after' = After,
|
||||
'limit' = Limit,
|
||||
'direction' = Direction
|
||||
},
|
||||
Descriptor = prepare_descriptor(Ns, Ref, HistoryRange),
|
||||
case call_automaton('Call', [Descriptor, wrap_args(Args)]) of
|
||||
{ok, Response} ->
|
||||
% should be specific to a processing interface already
|
||||
@ -122,25 +147,34 @@ repair(Ns, Ref, Args) ->
|
||||
{ok, history()} | {error, notfound} | no_return().
|
||||
|
||||
get_history(Ns, Ref) ->
|
||||
get_history(Ns, Ref, #'HistoryRange'{}).
|
||||
get_history(Ns, Ref, undefined, undefined, forward).
|
||||
|
||||
-spec get_history(ns(), ref(), undefined | event_id(), undefined | non_neg_integer()) ->
|
||||
{ok, history()} | {error, notfound} | no_return().
|
||||
|
||||
get_history(Ns, Ref, AfterID, Limit) ->
|
||||
get_history(Ns, Ref, #'HistoryRange'{'after' = AfterID, limit = Limit}).
|
||||
get_history(Ns, Ref, AfterID, Limit, forward).
|
||||
|
||||
-spec get_history(ns(), ref(), undefined | event_id(), undefined | non_neg_integer(), undefined | direction()) ->
|
||||
-spec get_history(ns(), ref(), undefined | event_id(), undefined | non_neg_integer(), direction()) ->
|
||||
{ok, history()} | {error, notfound} | no_return().
|
||||
|
||||
get_history(Ns, Ref, AfterID, Limit, Direction) ->
|
||||
get_history(Ns, Ref, #'HistoryRange'{'after' = AfterID, limit = Limit, direction = Direction}).
|
||||
case get_machine(Ns, Ref, AfterID, Limit, Direction) of
|
||||
{ok, #{history := History}} ->
|
||||
{ok, History};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
get_history(Ns, Ref, Range) ->
|
||||
-spec get_machine(ns(), ref(), undefined | event_id(), undefined | non_neg_integer(), direction()) ->
|
||||
{ok, machine()} | {error, notfound} | no_return().
|
||||
|
||||
get_machine(Ns, Ref, AfterID, Limit, Direction) ->
|
||||
Range = #'HistoryRange'{'after' = AfterID, limit = Limit, direction = Direction},
|
||||
Descriptor = prepare_descriptor(Ns, Ref, Range),
|
||||
case call_automaton('GetMachine', [Descriptor]) of
|
||||
{ok, #'Machine'{history = History}} when is_list(History) ->
|
||||
{ok, unmarshal_events(History)};
|
||||
{ok, #'Machine'{} = Machine} ->
|
||||
{ok, unmarshal_machine(Machine)};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
@ -183,7 +217,7 @@ handle_function_('ProcessSignal', [Args], #{ns := Ns} = _Opts) ->
|
||||
activity => signal,
|
||||
signal => Type
|
||||
}),
|
||||
dispatch_signal(Ns, Signal, Machine);
|
||||
dispatch_signal(Ns, Signal, unmarshal_machine(Machine));
|
||||
|
||||
handle_function_('ProcessCall', [Args], #{ns := Ns} = _Opts) ->
|
||||
#'CallArgs'{arg = Payload, machine = #'Machine'{id = ID} = Machine} = Args,
|
||||
@ -192,7 +226,7 @@ handle_function_('ProcessCall', [Args], #{ns := Ns} = _Opts) ->
|
||||
id => ID,
|
||||
activity => call
|
||||
}),
|
||||
dispatch_call(Ns, Payload, Machine).
|
||||
dispatch_call(Ns, Payload, unmarshal_machine(Machine)).
|
||||
|
||||
%%
|
||||
|
||||
@ -205,31 +239,27 @@ handle_function_('ProcessCall', [Args], #{ns := Ns} = _Opts) ->
|
||||
Result ::
|
||||
mg_proto_state_processing_thrift:'SignalResult'().
|
||||
|
||||
dispatch_signal(Ns, #'InitSignal'{arg = Payload}, #'Machine'{id = ID}) ->
|
||||
dispatch_signal(Ns, #'InitSignal'{arg = Payload}, Machine) ->
|
||||
Args = unwrap_args(Payload),
|
||||
_ = lager:debug("dispatch init with id = ~s and args = ~p", [ID, Args]),
|
||||
_ = log_dispatch(init, Args, Machine),
|
||||
Module = get_handler_module(Ns),
|
||||
Result = Module:init(ID, Args),
|
||||
marshal_signal_result(Result, undefined);
|
||||
Result = Module:init(Args, Machine),
|
||||
marshal_signal_result(Result, Machine);
|
||||
|
||||
dispatch_signal(Ns, #'TimeoutSignal'{}, #'Machine'{history = History0, aux_state = AuxSt0}) ->
|
||||
History = unmarshal_events(History0),
|
||||
AuxSt = unmarshal_aux_st(AuxSt0),
|
||||
_ = lager:debug("dispatch timeout with history = ~p, aux state = ~p", [History, AuxSt]),
|
||||
dispatch_signal(Ns, #'TimeoutSignal'{}, Machine) ->
|
||||
_ = log_dispatch(timeout, Machine),
|
||||
Module = get_handler_module(Ns),
|
||||
Result = Module:process_signal(timeout, History, AuxSt),
|
||||
marshal_signal_result(Result, AuxSt);
|
||||
Result = Module:process_signal(timeout, Machine),
|
||||
marshal_signal_result(Result, Machine);
|
||||
|
||||
dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, #'Machine'{history = History0, aux_state = AuxSt0}) ->
|
||||
dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, Machine) ->
|
||||
Args = unwrap_args(Payload),
|
||||
History = unmarshal_events(History0),
|
||||
AuxSt = unmarshal_aux_st(AuxSt0),
|
||||
_ = lager:debug("dispatch repair with args = ~p, history = ~p, aux state = ~p", [Args, History, AuxSt]),
|
||||
_ = log_dispatch(repair, Args, Machine),
|
||||
Module = get_handler_module(Ns),
|
||||
Result = Module:process_signal({repair, Args}, History, AuxSt),
|
||||
marshal_signal_result(Result, AuxSt).
|
||||
Result = Module:process_signal({repair, Args}, Machine),
|
||||
marshal_signal_result(Result, Machine).
|
||||
|
||||
marshal_signal_result(Result = #{}, AuxStWas) ->
|
||||
marshal_signal_result(Result = #{}, #{aux_state := AuxStWas}) ->
|
||||
_ = lager:debug("signal result = ~p", [Result]),
|
||||
Change = #'MachineStateChange'{
|
||||
events = marshal_events(maps:get(events, Result, [])),
|
||||
@ -245,16 +275,14 @@ marshal_signal_result(Result = #{}, AuxStWas) ->
|
||||
Call :: mg_proto_state_processing_thrift:'Args'(),
|
||||
Result :: mg_proto_state_processing_thrift:'CallResult'().
|
||||
|
||||
dispatch_call(Ns, Payload, #'Machine'{history = History0, aux_state = AuxSt0}) ->
|
||||
dispatch_call(Ns, Payload, Machine) ->
|
||||
Args = unwrap_args(Payload),
|
||||
History = unmarshal_events(History0),
|
||||
AuxSt = unmarshal_aux_st(AuxSt0),
|
||||
_ = lager:debug("dispatch call with args = ~p, history = ~p, aux state = ~p", [Args, History, AuxSt]),
|
||||
_ = log_dispatch(call, Args, Machine),
|
||||
Module = get_handler_module(Ns),
|
||||
Result = Module:process_call(Args, History, AuxSt),
|
||||
marshal_call_result(Result, AuxSt).
|
||||
Result = Module:process_call(Args, Machine),
|
||||
marshal_call_result(Result, Machine).
|
||||
|
||||
marshal_call_result({Response, Result}, AuxStWas) ->
|
||||
marshal_call_result({Response, Result}, #{aux_state := AuxStWas}) ->
|
||||
_ = lager:debug("call response = ~p with result = ~p", [Response, Result]),
|
||||
Change = #'MachineStateChange'{
|
||||
events = marshal_events(maps:get(events, Result, [])),
|
||||
@ -318,6 +346,25 @@ init(MachineHandlers) ->
|
||||
get_handler_module(Ns) ->
|
||||
ets:lookup_element(?TABLE, Ns, 2).
|
||||
|
||||
log_dispatch(Operation, #{id := ID, history := History, aux_state := AuxSt}) ->
|
||||
lager:debug(
|
||||
"dispatch ~p with id = ~p, history = ~p, aux state = ~p",
|
||||
[Operation, ID, History, AuxSt]
|
||||
).
|
||||
|
||||
log_dispatch(Operation, Args, #{id := ID, history := History, aux_state := AuxSt}) ->
|
||||
lager:debug(
|
||||
"dispatch ~p with id = ~p, args = ~p, history = ~p, aux state = ~p",
|
||||
[Operation, ID, Args, History, AuxSt]
|
||||
).
|
||||
|
||||
unmarshal_machine(#'Machine'{id = ID, history = History, aux_state = AuxSt}) ->
|
||||
#{
|
||||
id => ID,
|
||||
history => unmarshal_events(History),
|
||||
aux_state => unmarshal_aux_st(AuxSt)
|
||||
}.
|
||||
|
||||
marshal_events(Events) when is_list(Events) ->
|
||||
[hg_msgpack_marshalling:marshal(Event) || Event <- Events].
|
||||
|
||||
|
@ -10,8 +10,8 @@
|
||||
|
||||
-export([namespace/0]).
|
||||
-export([init/2]).
|
||||
-export([process_signal/3]).
|
||||
-export([process_call/3]).
|
||||
-export([process_signal/2]).
|
||||
-export([process_call/2]).
|
||||
|
||||
%% Event provider callbacks
|
||||
|
||||
@ -36,14 +36,16 @@
|
||||
|
||||
-define(NS, <<"party">>).
|
||||
-define(STEP, 5).
|
||||
-define(SNAPSHOT_STEP, 10).
|
||||
-define(CT_ERLANG_BINARY, <<"application/x-erlang-binary">>).
|
||||
|
||||
-record(st, {
|
||||
party :: undefined | party(),
|
||||
timestamp :: undefined | timestamp(),
|
||||
revision :: hg_domain:revision(),
|
||||
claims = #{} :: #{claim_id() => claim()},
|
||||
meta = #{} :: meta(),
|
||||
migration_data = #{} :: #{any() => any()}
|
||||
migration_data = #{} :: #{any() => any()},
|
||||
last_event = 0 :: event_id()
|
||||
}).
|
||||
|
||||
-type st() :: #st{}.
|
||||
@ -77,6 +79,21 @@
|
||||
-type meta_data() :: dmsl_domain_thrift:'PartyMetaData'().
|
||||
-type party_revision_param() :: dmsl_payment_processing_thrift:'PartyRevisionParam'().
|
||||
-type party_revision() :: dmsl_domain_thrift:'PartyRevision'().
|
||||
-type event_id() :: non_neg_integer().
|
||||
|
||||
-type content_type() :: binary().
|
||||
-type party_aux_st() :: #{
|
||||
snapshot_index := snapshot_index(),
|
||||
party_revision_index := party_revision_index()
|
||||
}.
|
||||
-type snapshot_index() :: [event_id()].
|
||||
-type party_revision_index() :: #{
|
||||
party_revision() => event_range()
|
||||
}.
|
||||
-type event_range() :: {
|
||||
FromEventID :: event_id() | undefined,
|
||||
ToEventID :: event_id() | undefined
|
||||
}.
|
||||
|
||||
-export_type([party_revision/0]).
|
||||
|
||||
@ -86,10 +103,10 @@
|
||||
namespace() ->
|
||||
?NS.
|
||||
|
||||
-spec init(party_id(), dmsl_payment_processing_thrift:'PartyParams'()) ->
|
||||
-spec init(dmsl_payment_processing_thrift:'PartyParams'(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
|
||||
init(ID, PartyParams) ->
|
||||
init(PartyParams, #{id := ID}) ->
|
||||
scoper:scope(
|
||||
party,
|
||||
#{
|
||||
@ -101,31 +118,40 @@ init(ID, PartyParams) ->
|
||||
|
||||
process_init(PartyID, #payproc_PartyParams{contact_info = ContactInfo}) ->
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
ok([?party_created(PartyID, ContactInfo, Timestamp), ?revision_changed(Timestamp, 0)]).
|
||||
Changes = [?party_created(PartyID, ContactInfo, Timestamp), ?revision_changed(Timestamp, 0)],
|
||||
#{
|
||||
events => [wrap_event_payload(?party_ev(Changes))],
|
||||
auxst => wrap_aux_state(#{
|
||||
snapshot_index => [],
|
||||
party_revision_index => #{}
|
||||
})
|
||||
}.
|
||||
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
|
||||
process_signal(timeout, _History, _AuxSt) ->
|
||||
process_signal(timeout, _Machine) ->
|
||||
#{};
|
||||
|
||||
process_signal({repair, _}, _History, _AuxSt) ->
|
||||
process_signal({repair, _}, _Machine) ->
|
||||
#{}.
|
||||
|
||||
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_call(call(), hg_machine:machine()) ->
|
||||
{hg_machine:response(), hg_machine:result()}.
|
||||
|
||||
process_call(Call, History, _AuxSt) ->
|
||||
St = collapse_history(unwrap_events(History)),
|
||||
process_call(Call, #{id := PartyID, history := History, aux_state := WrappedAuxSt}) ->
|
||||
try
|
||||
Party = get_st_party(St),
|
||||
scoper:scope(
|
||||
party,
|
||||
#{
|
||||
id => Party#domain_Party.id,
|
||||
id => PartyID,
|
||||
activity => get_call_name(Call)
|
||||
},
|
||||
fun() -> handle_call(Call, {St, []}) end
|
||||
fun() ->
|
||||
AuxSt0 = unwrap_aux_state(WrappedAuxSt),
|
||||
{St, AuxSt1} = get_state_for_call(PartyID, History, AuxSt0),
|
||||
handle_call(Call, AuxSt1, St)
|
||||
end
|
||||
)
|
||||
catch
|
||||
throw:Exception ->
|
||||
@ -137,83 +163,128 @@ get_call_name(Call) when is_tuple(Call) ->
|
||||
get_call_name(Call) when is_atom(Call) ->
|
||||
Call.
|
||||
|
||||
handle_call({block, Target, Reason}, {St, _}) ->
|
||||
handle_call({block, Target, Reason}, AuxSt, St) ->
|
||||
ok = assert_unblocked(Target, St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = get_next_party_revision(St),
|
||||
respond(ok, [
|
||||
block(Target, Reason, Timestamp),
|
||||
?revision_changed(Timestamp, Revision)
|
||||
]);
|
||||
respond(
|
||||
ok,
|
||||
[block(Target, Reason, Timestamp), ?revision_changed(Timestamp, Revision)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({unblock, Target, Reason}, {St, _}) ->
|
||||
handle_call({unblock, Target, Reason}, AuxSt, St) ->
|
||||
ok = assert_blocked(Target, St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = get_next_party_revision(St),
|
||||
respond(ok, [
|
||||
unblock(Target, Reason, Timestamp),
|
||||
?revision_changed(Timestamp, Revision)
|
||||
]);
|
||||
respond(
|
||||
ok,
|
||||
[unblock(Target, Reason, Timestamp), ?revision_changed(Timestamp, Revision)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({suspend, Target}, {St, _}) ->
|
||||
handle_call({suspend, Target}, AuxSt, St) ->
|
||||
ok = assert_unblocked(Target, St),
|
||||
ok = assert_active(Target, St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = get_next_party_revision(St),
|
||||
respond(ok, [
|
||||
suspend(Target, Timestamp),
|
||||
?revision_changed(Timestamp, Revision)
|
||||
]);
|
||||
respond(
|
||||
ok,
|
||||
[suspend(Target, Timestamp), ?revision_changed(Timestamp, Revision)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({activate, Target}, {St, _}) ->
|
||||
handle_call({activate, Target}, AuxSt, St) ->
|
||||
ok = assert_unblocked(Target, St),
|
||||
ok = assert_suspended(Target, St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = get_next_party_revision(St),
|
||||
respond(ok, [
|
||||
activate(Target, Timestamp),
|
||||
?revision_changed(Timestamp, Revision)
|
||||
]);
|
||||
respond(
|
||||
ok,
|
||||
[activate(Target, Timestamp), ?revision_changed(Timestamp, Revision)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({set_metadata, NS, Data}, _) ->
|
||||
respond(ok, [?party_meta_set(NS, Data)]);
|
||||
handle_call({set_metadata, NS, Data}, AuxSt, St) ->
|
||||
respond(
|
||||
ok,
|
||||
[?party_meta_set(NS, Data)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({remove_metadata, NS}, {St, _}) ->
|
||||
handle_call({remove_metadata, NS}, AuxSt, St) ->
|
||||
_ = get_st_metadata(NS, St),
|
||||
respond(ok, [?party_meta_removed(NS)]);
|
||||
respond(
|
||||
ok,
|
||||
[?party_meta_removed(NS)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({create_claim, Changeset}, {St, _}) ->
|
||||
handle_call({create_claim, Changeset}, AuxSt, St) ->
|
||||
ok = assert_party_operable(St),
|
||||
{Claim, Changes} = create_claim(Changeset, St),
|
||||
respond(Claim, Changes);
|
||||
respond(
|
||||
Claim,
|
||||
Changes,
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({update_claim, ID, ClaimRevision, Changeset}, {St, _}) ->
|
||||
handle_call({update_claim, ID, ClaimRevision, Changeset}, AuxSt, St) ->
|
||||
ok = assert_party_operable(St),
|
||||
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
|
||||
respond(ok, update_claim(ID, Changeset, St));
|
||||
respond(
|
||||
ok,
|
||||
update_claim(ID, Changeset, St),
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({accept_claim, ID, ClaimRevision}, {St, _}) ->
|
||||
handle_call({accept_claim, ID, ClaimRevision}, AuxSt, St) ->
|
||||
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = get_next_party_revision(St),
|
||||
Claim = hg_claim:accept(
|
||||
Timestamp,
|
||||
get_st_revision(St),
|
||||
hg_domain:head(),
|
||||
get_st_party(St),
|
||||
get_st_claim(ID, St)
|
||||
),
|
||||
respond(ok, [finalize_claim(Claim, Timestamp), ?revision_changed(Timestamp, Revision)]);
|
||||
respond(
|
||||
ok,
|
||||
[finalize_claim(Claim, Timestamp), ?revision_changed(Timestamp, Revision)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({deny_claim, ID, ClaimRevision, Reason}, {St, _}) ->
|
||||
handle_call({deny_claim, ID, ClaimRevision, Reason}, AuxSt, St) ->
|
||||
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
|
||||
Claim = hg_claim:deny(Reason, get_st_timestamp(St), get_st_claim(ID, St)),
|
||||
respond(ok, [finalize_claim(Claim, get_st_timestamp(St))]);
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Claim = hg_claim:deny(Reason, Timestamp, get_st_claim(ID, St)),
|
||||
respond(
|
||||
ok,
|
||||
[finalize_claim(Claim, Timestamp)],
|
||||
AuxSt,
|
||||
St
|
||||
);
|
||||
|
||||
handle_call({revoke_claim, ID, ClaimRevision, Reason}, {St, _}) ->
|
||||
handle_call({revoke_claim, ID, ClaimRevision, Reason}, AuxSt, St) ->
|
||||
ok = assert_party_operable(St),
|
||||
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
|
||||
Claim = hg_claim:revoke(Reason, get_st_timestamp(St), get_st_claim(ID, St)),
|
||||
respond(ok, [finalize_claim(Claim, get_st_timestamp(St))]).
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Claim = hg_claim:revoke(Reason, Timestamp, get_st_claim(ID, St)),
|
||||
respond(
|
||||
ok,
|
||||
[finalize_claim(Claim, Timestamp)],
|
||||
AuxSt,
|
||||
St
|
||||
).
|
||||
|
||||
publish_party_event(Source, {ID, Dt, Ev = ?party_ev(_)}) ->
|
||||
#payproc_Event{id = ID, source = Source, created_at = Dt, payload = Ev}.
|
||||
@ -224,7 +295,7 @@ publish_party_event(Source, {ID, Dt, Ev = ?party_ev(_)}) ->
|
||||
hg_event_provider:public_event().
|
||||
|
||||
publish_event(PartyID, Ev) ->
|
||||
{{party_id, PartyID}, unwrap_event(Ev)}.
|
||||
{{party_id, PartyID}, unwrap_event_payload(Ev)}.
|
||||
|
||||
%%
|
||||
-spec start(party_id(), Args :: term()) ->
|
||||
@ -244,16 +315,66 @@ start(PartyID, Args) ->
|
||||
get_party(PartyID) ->
|
||||
get_st_party(get_state(PartyID)).
|
||||
|
||||
get_state(PartyID) ->
|
||||
AuxSt = get_aux_state(PartyID),
|
||||
get_state(PartyID, get_snapshot_index(AuxSt)).
|
||||
|
||||
get_state(PartyID, []) ->
|
||||
%% No snapshots, so we need entire history
|
||||
Events = lists:map(fun unwrap_event/1, get_history(PartyID, undefined, undefined, forward)),
|
||||
merge_events(Events, #st{});
|
||||
get_state(PartyID, [FirstID | _]) ->
|
||||
History = get_history(PartyID, FirstID - 1, undefined, forward),
|
||||
Events = lists:map(fun unwrap_event/1, History),
|
||||
[FirstEvent | _] = History,
|
||||
St = unwrap_state(FirstEvent),
|
||||
merge_events(Events, St).
|
||||
|
||||
get_state_for_call(PartyID, ReversedHistoryPart, AuxSt) ->
|
||||
{St, History} = parse_history(ReversedHistoryPart),
|
||||
get_state_for_call(PartyID, {St, History}, [], AuxSt).
|
||||
|
||||
get_state_for_call(PartyID, {undefined, [{FirstID, _, _} | _] = Events}, EventsAcc, AuxSt)
|
||||
when FirstID > 1
|
||||
->
|
||||
Limit = get_limit(FirstID, get_snapshot_index(AuxSt)),
|
||||
NewHistoryPart = parse_history(get_history(PartyID, FirstID, Limit, backward)),
|
||||
get_state_for_call(PartyID, NewHistoryPart, Events ++ EventsAcc, AuxSt);
|
||||
get_state_for_call(_, {St0, Events}, EventsAcc, AuxSt0) ->
|
||||
%% here we can get entire history.
|
||||
%% we can use it to create revision index for AuxSt
|
||||
PartyRevisionIndex0 = get_party_revision_index(AuxSt0),
|
||||
{St1, PartyRevisionIndex1} = build_revision_index(
|
||||
Events ++ EventsAcc,
|
||||
PartyRevisionIndex0,
|
||||
hg_utils:select_defined(St0, #st{})
|
||||
),
|
||||
AuxSt1 = set_party_revision_index(PartyRevisionIndex1, AuxSt0),
|
||||
{St1, AuxSt1}.
|
||||
|
||||
parse_history(ReversedHistoryPart) ->
|
||||
parse_history(ReversedHistoryPart, []).
|
||||
|
||||
parse_history([WrappedEvent | Others], EventsAcc) ->
|
||||
Event = unwrap_event(WrappedEvent),
|
||||
case unwrap_state(WrappedEvent) of
|
||||
undefined ->
|
||||
parse_history(Others, [Event | EventsAcc]);
|
||||
#st{} = St ->
|
||||
{St, [Event | EventsAcc]}
|
||||
end;
|
||||
parse_history([], EventsAcc) ->
|
||||
{undefined, EventsAcc}.
|
||||
|
||||
-spec checkout(party_id(), party_revision_param()) ->
|
||||
dmsl_domain_thrift:'Party'() | no_return().
|
||||
|
||||
checkout(PartyID, RevisionParam) ->
|
||||
case checkout_history(get_history(PartyID), RevisionParam) of
|
||||
{ok, St} ->
|
||||
get_st_party(St);
|
||||
{error, Reason} ->
|
||||
error(Reason)
|
||||
end.
|
||||
get_st_party(
|
||||
hg_utils:unwrap_result(
|
||||
checkout_party(PartyID, RevisionParam)
|
||||
)
|
||||
).
|
||||
|
||||
-spec get_last_revision(party_id()) ->
|
||||
party_revision() | no_return().
|
||||
@ -266,7 +387,14 @@ get_last_revision(PartyID) ->
|
||||
term() | no_return().
|
||||
|
||||
call(PartyID, Call) ->
|
||||
map_error(hg_machine:call(?NS, PartyID, Call)).
|
||||
map_error(hg_machine:call(
|
||||
?NS,
|
||||
PartyID,
|
||||
Call,
|
||||
undefined,
|
||||
?SNAPSHOT_STEP,
|
||||
backward
|
||||
)).
|
||||
|
||||
map_error({ok, CallResult}) ->
|
||||
case CallResult of
|
||||
@ -310,16 +438,24 @@ get_metadata(NS, PartyID) ->
|
||||
[dmsl_payment_processing_thrift:'Event'()].
|
||||
|
||||
get_public_history(PartyID, AfterID, Limit) ->
|
||||
[publish_party_event({party_id, PartyID}, Ev) || Ev <- get_history(PartyID, AfterID, Limit)].
|
||||
|
||||
get_state(PartyID) ->
|
||||
collapse_history(get_history(PartyID)).
|
||||
|
||||
get_history(PartyID) ->
|
||||
map_history_error(hg_machine:get_history(?NS, PartyID)).
|
||||
Events = unwrap_events(get_history(PartyID, AfterID, Limit)),
|
||||
[publish_party_event({party_id, PartyID}, Ev) || Ev <- Events].
|
||||
|
||||
get_history(PartyID, AfterID, Limit) ->
|
||||
map_history_error(hg_machine:get_history(?NS, PartyID, AfterID, Limit)).
|
||||
get_history(PartyID, AfterID, Limit, forward).
|
||||
|
||||
get_history(PartyID, AfterID, Limit, Direction) ->
|
||||
map_history_error(hg_machine:get_history(?NS, PartyID, AfterID, Limit, Direction)).
|
||||
|
||||
get_aux_state(PartyID) ->
|
||||
#{aux_state := AuxSt} = map_history_error(hg_machine:get_machine(
|
||||
?NS,
|
||||
PartyID,
|
||||
undefined,
|
||||
0,
|
||||
backward
|
||||
)),
|
||||
unwrap_aux_state(AuxSt).
|
||||
|
||||
get_revision_of_part(PartyID, History, Last, Step) ->
|
||||
case find_revision_in_history(History) of
|
||||
@ -333,7 +469,7 @@ get_revision_of_part(PartyID, History, Last, Step) ->
|
||||
end.
|
||||
|
||||
get_history_part(PartyID, Last, Step) ->
|
||||
case map_history_error(hg_machine:get_history(?NS, PartyID, Last, Step, backward)) of
|
||||
case unwrap_events(get_history(PartyID, Last, Step, backward)) of
|
||||
[] ->
|
||||
{[], 0, 0};
|
||||
History ->
|
||||
@ -362,7 +498,7 @@ find_revision_in_changes([Event | Rest]) ->
|
||||
end.
|
||||
|
||||
map_history_error({ok, Result}) ->
|
||||
unwrap_events(Result);
|
||||
Result;
|
||||
map_history_error({error, notfound}) ->
|
||||
throw(#payproc_PartyNotFound{}).
|
||||
|
||||
@ -388,18 +524,6 @@ get_st_pending_claims(#st{claims = Claims})->
|
||||
Claims
|
||||
)).
|
||||
|
||||
-spec get_st_timestamp(st()) ->
|
||||
timestamp().
|
||||
|
||||
get_st_timestamp(#st{timestamp = Timestamp}) ->
|
||||
Timestamp.
|
||||
|
||||
-spec get_st_revision(st()) ->
|
||||
hg_domain:revision().
|
||||
|
||||
get_st_revision(#st{revision = Revision}) ->
|
||||
Revision.
|
||||
|
||||
-spec get_st_metadata(meta_ns(), st()) ->
|
||||
meta_data().
|
||||
|
||||
@ -427,10 +551,7 @@ assert_claim_modification_allowed(ID, Revision, St) ->
|
||||
ok = hg_claim:assert_revision(Claim, Revision),
|
||||
ok = hg_claim:assert_pending(Claim).
|
||||
|
||||
assert_claims_not_conflict(Claim, ClaimsPending, St) ->
|
||||
Timestamp = get_st_timestamp(St),
|
||||
Revision = get_st_revision(St),
|
||||
Party = get_st_party(St),
|
||||
assert_claims_not_conflict(Claim, ClaimsPending, Timestamp, Revision, Party) ->
|
||||
ConflictedClaims = lists:dropwhile(
|
||||
fun(PendingClaim) ->
|
||||
hg_claim:get_id(Claim) =:= hg_claim:get_id(PendingClaim) orelse
|
||||
@ -448,13 +569,13 @@ assert_claims_not_conflict(Claim, ClaimsPending, St) ->
|
||||
%%
|
||||
|
||||
create_claim(Changeset, St) ->
|
||||
Timestamp = get_st_timestamp(St),
|
||||
Revision = get_st_revision(St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = hg_domain:head(),
|
||||
Party = get_st_party(St),
|
||||
Claim = hg_claim:create(get_next_claim_id(St), Changeset, Party, Timestamp, Revision),
|
||||
ClaimsPending = get_st_pending_claims(St),
|
||||
% Check for conflicts with other pending claims
|
||||
ok = assert_claims_not_conflict(Claim, ClaimsPending, St),
|
||||
ok = assert_claims_not_conflict(Claim, ClaimsPending, Timestamp, Revision, Party),
|
||||
% Test if we can safely accept proposed changes.
|
||||
case hg_claim:is_need_acceptance(Claim, Party, Revision) of
|
||||
false ->
|
||||
@ -480,16 +601,18 @@ create_claim(Changeset, St) ->
|
||||
end.
|
||||
|
||||
update_claim(ID, Changeset, St) ->
|
||||
Timestamp = get_st_timestamp(St),
|
||||
Timestamp = hg_datetime:format_now(),
|
||||
Revision = hg_domain:head(),
|
||||
Party = get_st_party(St),
|
||||
Claim = hg_claim:update(
|
||||
Changeset,
|
||||
get_st_claim(ID, St),
|
||||
get_st_party(St),
|
||||
Party,
|
||||
Timestamp,
|
||||
get_st_revision(St)
|
||||
Revision
|
||||
),
|
||||
ClaimsPending = get_st_pending_claims(St),
|
||||
ok = assert_claims_not_conflict(Claim, ClaimsPending, St),
|
||||
ok = assert_claims_not_conflict(Claim, ClaimsPending, Timestamp, Revision, Party),
|
||||
[?claim_updated(ID, Changeset, hg_claim:get_revision(Claim), Timestamp)].
|
||||
|
||||
finalize_claim(Claim, Timestamp) ->
|
||||
@ -507,38 +630,90 @@ get_next_claim_id(#st{claims = Claims}) ->
|
||||
apply_accepted_claim(Claim, St) ->
|
||||
case hg_claim:is_accepted(Claim) of
|
||||
true ->
|
||||
Party = hg_claim:apply(Claim, get_st_timestamp(St), get_st_party(St)),
|
||||
Party = hg_claim:apply(Claim, hg_datetime:format_now(), get_st_party(St)),
|
||||
St#st{party = Party};
|
||||
false ->
|
||||
St
|
||||
end.
|
||||
|
||||
ok(Changes) ->
|
||||
#{events => wrap_events([?party_ev(Changes)])}.
|
||||
|
||||
respond(Response, Changes) ->
|
||||
{{ok, Response}, #{events => wrap_events([?party_ev(Changes)])}}.
|
||||
respond(Response, Changes, AuxSt0, St) ->
|
||||
AuxSt1 = append_party_revision_index(St, AuxSt0),
|
||||
{Events, AuxSt2} = try_attach_snapshot(Changes, AuxSt1, St),
|
||||
{
|
||||
{ok, Response},
|
||||
#{
|
||||
events => Events,
|
||||
auxst => AuxSt2
|
||||
}
|
||||
}.
|
||||
|
||||
respond_w_exception(Exception) ->
|
||||
{{exception, Exception}, #{}}.
|
||||
|
||||
append_party_revision_index(St, AuxSt) ->
|
||||
PartyRevisionIndex0 = get_party_revision_index(AuxSt),
|
||||
PartyRevisionIndex1 = update_party_revision_index(St, PartyRevisionIndex0),
|
||||
set_party_revision_index(PartyRevisionIndex1, AuxSt).
|
||||
|
||||
update_party_revision_index(St, PartyRevisionIndex) ->
|
||||
#domain_Party{revision = PartyRevision} = get_st_party(St),
|
||||
EventID = St#st.last_event,
|
||||
{FromEventID, ToEventID} = get_party_revision_range(PartyRevision, PartyRevisionIndex),
|
||||
PartyRevisionIndex#{
|
||||
PartyRevision => {
|
||||
hg_utils:select_defined(FromEventID, EventID),
|
||||
max(hg_utils:select_defined(ToEventID, EventID), EventID)
|
||||
}
|
||||
}.
|
||||
|
||||
get_party_revision_index(AuxSt) ->
|
||||
maps:get(party_revision_index, AuxSt, #{}).
|
||||
|
||||
set_party_revision_index(PartyRevisionIndex, AuxSt) ->
|
||||
AuxSt#{party_revision_index => PartyRevisionIndex}.
|
||||
|
||||
get_party_revision_range(PartyRevision, PartyRevisionIndex) ->
|
||||
maps:get(PartyRevision, PartyRevisionIndex, {undefined, undefined}).
|
||||
|
||||
%% TODO crunch func, will be removed after a short (or not so short) time
|
||||
build_revision_index([Event | History], PartyRevisionIndex0, St0) ->
|
||||
St1 = merge_event(Event, St0),
|
||||
PartyRevisionIndex1 = update_party_revision_index(St1, PartyRevisionIndex0),
|
||||
build_revision_index(History, PartyRevisionIndex1, St1);
|
||||
build_revision_index([], PartyRevisionIndex, St) ->
|
||||
{St, PartyRevisionIndex}.
|
||||
|
||||
append_snapshot_index(EventID, AuxSt) ->
|
||||
SnapshotIndex = get_snapshot_index(AuxSt),
|
||||
set_snapshot_index([EventID | SnapshotIndex], AuxSt).
|
||||
|
||||
get_snapshot_index(AuxSt) ->
|
||||
maps:get(snapshot_index, AuxSt, []).
|
||||
|
||||
set_snapshot_index(SnapshotIndex, AuxSt) ->
|
||||
AuxSt#{snapshot_index => SnapshotIndex}.
|
||||
|
||||
get_limit(undefined, _) ->
|
||||
%% we can't get any reasonable limit in this case
|
||||
undefined;
|
||||
get_limit(ToEventID, [SnapshotEventID | _]) when SnapshotEventID < ToEventID ->
|
||||
ToEventID - SnapshotEventID;
|
||||
get_limit(ToEventID, [_ | SnapshotIndex]) ->
|
||||
get_limit(ToEventID, SnapshotIndex);
|
||||
get_limit(_ToEventID, []) ->
|
||||
undefined.
|
||||
|
||||
%%
|
||||
|
||||
-spec collapse_history(hg_machine:history()) -> st().
|
||||
-spec checkout_party(party_id(), party_revision_param()) -> {ok, st()} | {error, revision_not_found}.
|
||||
|
||||
collapse_history(History) ->
|
||||
{ok, St} = checkout_history(History, {timestamp, hg_datetime:format_now()}),
|
||||
St.
|
||||
checkout_party(PartyID, {timestamp, Timestamp}) ->
|
||||
Events = unwrap_events(get_history(PartyID, undefined, undefined)),
|
||||
checkout_history_by_timestamp(Events, Timestamp, #st{});
|
||||
checkout_party(PartyID, {revision, Revision}) ->
|
||||
checkout_party_by_revision(PartyID, Revision).
|
||||
|
||||
-spec checkout_history(hg_machine:history(), party_revision_param()) -> {ok, st()} | {error, revision_not_found}.
|
||||
|
||||
checkout_history(History, {timestamp, Timestamp}) ->
|
||||
% FIXME hg_domain:head() looks strange here
|
||||
checkout_history_by_timestamp(History, Timestamp, #st{revision = hg_domain:head()});
|
||||
checkout_history(History, {revision, Revision}) ->
|
||||
checkout_history_by_revision(History, Revision, #st{revision = hg_domain:head()}).
|
||||
|
||||
checkout_history_by_timestamp([{_, _, Ev} | Rest], Timestamp, #st{timestamp = PrevTimestamp} = St) ->
|
||||
checkout_history_by_timestamp([Ev | Rest], Timestamp, #st{timestamp = PrevTimestamp} = St) ->
|
||||
St1 = merge_event(Ev, St),
|
||||
EventTimestamp = St1#st.timestamp,
|
||||
case hg_datetime:compare(EventTimestamp, Timestamp) of
|
||||
@ -552,7 +727,24 @@ checkout_history_by_timestamp([{_, _, Ev} | Rest], Timestamp, #st{timestamp = Pr
|
||||
checkout_history_by_timestamp([], Timestamp, St) ->
|
||||
{ok, St#st{timestamp = Timestamp}}.
|
||||
|
||||
checkout_history_by_revision([{_, _, Ev} | Rest], Revision, St) ->
|
||||
checkout_party_by_revision(PartyID, Revision) ->
|
||||
AuxSt = get_aux_state(PartyID),
|
||||
FromEventID = case get_party_revision_range(Revision, get_party_revision_index(AuxSt)) of
|
||||
{_, undefined} ->
|
||||
undefined;
|
||||
{_, EventID} ->
|
||||
EventID + 1
|
||||
end,
|
||||
Limit = get_limit(FromEventID, get_snapshot_index(AuxSt)),
|
||||
ReversedHistory = get_history(PartyID, FromEventID, Limit, backward),
|
||||
case parse_history(ReversedHistory) of
|
||||
{undefined, Events} ->
|
||||
checkout_history_by_revision(Events, Revision, #st{});
|
||||
{St, Events} ->
|
||||
checkout_history_by_revision(Events, Revision, St)
|
||||
end.
|
||||
|
||||
checkout_history_by_revision([Ev | Rest], Revision, St) ->
|
||||
St1 = merge_event(Ev, St),
|
||||
case get_st_party(St1) of
|
||||
#domain_Party{revision = Revision1} when Revision1 > Revision ->
|
||||
@ -568,8 +760,13 @@ checkout_history_by_revision([], Revision, St) ->
|
||||
{error, revision_not_found}
|
||||
end.
|
||||
|
||||
merge_event(?party_ev(PartyChanges), St) when is_list(PartyChanges) ->
|
||||
lists:foldl(fun merge_party_change/2, St, PartyChanges).
|
||||
merge_events(Events, St) ->
|
||||
lists:foldl(fun merge_event/2, St, Events).
|
||||
|
||||
merge_event({ID, _Dt, ?party_ev(PartyChanges)}, #st{last_event = LastEventID} = St)
|
||||
when is_list(PartyChanges) andalso ID =:= LastEventID + 1
|
||||
->
|
||||
lists:foldl(fun merge_party_change/2, St#st{last_event = ID}, PartyChanges).
|
||||
|
||||
merge_party_change(?party_created(PartyID, ContactInfo, Timestamp), St) ->
|
||||
St#st{
|
||||
@ -856,26 +1053,113 @@ get_template(TemplateRef, Revision) ->
|
||||
|
||||
%%
|
||||
|
||||
try_attach_snapshot(Changes, AuxSt0, #st{last_event = LastEventID} = St)
|
||||
when
|
||||
LastEventID > 0 andalso
|
||||
LastEventID rem ?SNAPSHOT_STEP =:= 0
|
||||
->
|
||||
AuxSt1 = append_snapshot_index(LastEventID + 1, AuxSt0),
|
||||
{
|
||||
[wrap_event_payload_w_snapshot(?party_ev(Changes), St)],
|
||||
wrap_aux_state(AuxSt1)
|
||||
};
|
||||
try_attach_snapshot(Changes, AuxSt, _) ->
|
||||
{
|
||||
[wrap_event_payload(?party_ev(Changes))],
|
||||
wrap_aux_state(AuxSt)
|
||||
}.
|
||||
|
||||
%% TODO add transmutations for new international legal entities and bank accounts
|
||||
|
||||
-define(TOP_VERSION, 6).
|
||||
|
||||
wrap_events(Events) ->
|
||||
[hg_party_marshalling:marshal([?TOP_VERSION, E]) || E <- Events].
|
||||
wrap_event_payload(Event) ->
|
||||
ContentType = ?CT_ERLANG_BINARY,
|
||||
Meta = #{
|
||||
<<"vsn">> => ?TOP_VERSION,
|
||||
<<"ct">> => ContentType
|
||||
},
|
||||
Data = encode_event(ContentType, Event),
|
||||
[Meta, Data].
|
||||
|
||||
wrap_event_payload_w_snapshot(Event, St) ->
|
||||
ContentType = ?CT_ERLANG_BINARY,
|
||||
Meta = #{
|
||||
<<"vsn">> => ?TOP_VERSION,
|
||||
<<"ct">> => ContentType,
|
||||
<<"state_snapshot">> => encode_state(ContentType, St)
|
||||
},
|
||||
Data = encode_event(ContentType, Event),
|
||||
[Meta, Data].
|
||||
|
||||
unwrap_events(History) ->
|
||||
lists:map(
|
||||
fun({ID, Dt, Event}) ->
|
||||
{ID, Dt, unwrap_event(Event)}
|
||||
end,
|
||||
History
|
||||
).
|
||||
[unwrap_event(E) || E <- History].
|
||||
|
||||
unwrap_event(Event) when is_list(Event) ->
|
||||
unwrap_event({ID, Dt, Event}) ->
|
||||
{ID, Dt, unwrap_event_payload(Event)}.
|
||||
|
||||
unwrap_event_payload([
|
||||
#{
|
||||
<<"vsn">> := Version,
|
||||
<<"ct">> := ContentType
|
||||
},
|
||||
EncodedEvent
|
||||
]) ->
|
||||
transmute([Version, decode_event(ContentType, EncodedEvent)]);
|
||||
%% TODO legacy support, will be removed after migration
|
||||
unwrap_event_payload(Event) when is_list(Event) ->
|
||||
transmute(hg_party_marshalling:unmarshal(Event));
|
||||
unwrap_event({bin, Bin}) when is_binary(Bin) ->
|
||||
unwrap_event_payload({bin, Bin}) when is_binary(Bin) ->
|
||||
transmute([1, binary_to_term(Bin)]).
|
||||
|
||||
unwrap_state({
|
||||
_ID,
|
||||
_Dt,
|
||||
[
|
||||
#{<<"ct">> := ContentType, <<"state_snapshot">> := EncodedSt},
|
||||
_EncodedEvent
|
||||
]
|
||||
}) ->
|
||||
decode_state(ContentType, EncodedSt);
|
||||
unwrap_state(_) ->
|
||||
undefined.
|
||||
|
||||
encode_state(?CT_ERLANG_BINARY, St) ->
|
||||
{bin, term_to_binary(St)}.
|
||||
|
||||
decode_state(?CT_ERLANG_BINARY, {bin, EncodedSt}) ->
|
||||
binary_to_term(EncodedSt).
|
||||
|
||||
encode_event(?CT_ERLANG_BINARY, Event) ->
|
||||
{bin, term_to_binary(Event)}.
|
||||
|
||||
decode_event(?CT_ERLANG_BINARY, {bin, EncodedEvent}) ->
|
||||
binary_to_term(EncodedEvent).
|
||||
|
||||
-spec wrap_aux_state(party_aux_st()) -> hg_msgpack_marshalling:msgpack_value().
|
||||
|
||||
wrap_aux_state(AuxSt) ->
|
||||
ContentType = ?CT_ERLANG_BINARY,
|
||||
#{<<"ct">> => ContentType, <<"aux_state">> => encode_aux_state(ContentType, AuxSt)}.
|
||||
|
||||
-spec unwrap_aux_state(hg_msgpack_marshalling:msgpack_value()) -> party_aux_st().
|
||||
|
||||
unwrap_aux_state(#{<<"ct">> := ContentType, <<"aux_state">> := AuxSt}) ->
|
||||
decode_aux_state(ContentType, AuxSt);
|
||||
%% backward compatibility
|
||||
unwrap_aux_state(undefined) ->
|
||||
#{}.
|
||||
|
||||
-spec encode_aux_state(content_type(), party_aux_st()) -> dmsl_msgpack_thrift:'Value'().
|
||||
|
||||
encode_aux_state(?CT_ERLANG_BINARY, AuxSt) ->
|
||||
{bin, term_to_binary(AuxSt)}.
|
||||
|
||||
-spec decode_aux_state(content_type(), dmsl_msgpack_thrift:'Value'()) -> party_aux_st().
|
||||
|
||||
decode_aux_state(?CT_ERLANG_BINARY, {bin, AuxSt}) ->
|
||||
binary_to_term(AuxSt).
|
||||
|
||||
transmute([Version, Event]) ->
|
||||
transmute_event(Version, ?TOP_VERSION, Event).
|
||||
|
||||
|
@ -25,8 +25,8 @@
|
||||
-behaviour(hg_machine).
|
||||
-export([namespace /0]).
|
||||
-export([init /2]).
|
||||
-export([process_signal/3]).
|
||||
-export([process_call /3]).
|
||||
-export([process_signal/2]).
|
||||
-export([process_call /2]).
|
||||
|
||||
%% Types
|
||||
-record(st, {
|
||||
@ -39,7 +39,6 @@
|
||||
-type st() :: #st{}.
|
||||
-export_type([st/0]).
|
||||
|
||||
-type rec_payment_tool_id() :: dmsl_payment_processing_thrift:'RecurrentPaymentToolID'().
|
||||
-type rec_payment_tool() :: dmsl_payment_processing_thrift:'RecurrentPaymentTool'().
|
||||
-type rec_payment_tool_params() :: dmsl_payment_processing_thrift:'RecurrentPaymentToolParams'().
|
||||
|
||||
@ -192,9 +191,9 @@ map_start_error({error, Reason}) ->
|
||||
namespace() ->
|
||||
?NS.
|
||||
|
||||
-spec init(rec_payment_tool_id(), [payment_tool() | rec_payment_tool_params()]) ->
|
||||
-spec init([payment_tool() | rec_payment_tool_params()], hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
init(RecPaymentToolID, [PaymentTool, Params]) ->
|
||||
init([PaymentTool, Params], #{id := RecPaymentToolID}) ->
|
||||
Revision = hg_domain:head(),
|
||||
CreatedAt = hg_datetime:format_now(),
|
||||
{Party, Shop} = get_party_shop(Params),
|
||||
@ -280,9 +279,9 @@ start_session() ->
|
||||
Action = hg_machine_action:instant(),
|
||||
{ok, {Events, Action}}.
|
||||
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
|
||||
hg_machine:result().
|
||||
process_signal(Signal, History, _AuxSt) ->
|
||||
process_signal(Signal, #{history := History}) ->
|
||||
handle_result(handle_signal(Signal, collapse_history(unmarshal(History)))).
|
||||
|
||||
handle_signal(timeout, St) ->
|
||||
@ -500,9 +499,9 @@ create_session() ->
|
||||
|
||||
-type call() :: abandon.
|
||||
|
||||
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
|
||||
-spec process_call(call(), hg_machine:machine()) ->
|
||||
{hg_machine:response(), hg_machine:result()}.
|
||||
process_call(Call, History, _AuxSt) ->
|
||||
process_call(Call, #{history := History}) ->
|
||||
St = collapse_history(unmarshal(History)),
|
||||
try handle_result(handle_call(Call, St)) catch
|
||||
throw:Exception ->
|
||||
|
@ -254,8 +254,6 @@ end_per_suite(C) ->
|
||||
|
||||
init_per_group(shop_blocking_suspension, C) ->
|
||||
C;
|
||||
init_per_group(wallet_blocking_suspension, C) ->
|
||||
C;
|
||||
init_per_group(Group, C) ->
|
||||
PartyID = list_to_binary(lists:concat([Group, ".", erlang:system_time()])),
|
||||
ApiClient = hg_ct_helper:create_client(cfg(root_url, C), PartyID),
|
||||
|
Loading…
Reference in New Issue
Block a user