HG-408: switch to binary event storage (#242)

* HG-408: switch to binary event storage

* use metadata obviously

* encode/decode perfection

* dumb packing logic

* dumb unpacking

* fix dialyzaaaar

* little typing improvement

* introduce primitive AuxState

* use aux state for state composition

* rework aux state usage a little bit

* use AuxState for checking out by revision

* fix awkward bug

* move header to proper location

* revert previous ugly commit

Revert "fix awkward bug"

* use 0 limit for get_state and checkout_by_revision

* rework get_state for handle_call

* build party revision index on call

* remove adhoc with domain revision

* remove get_st_timestamp()
This commit is contained in:
Evgeny Levenets 2018-10-23 13:43:39 +03:00 committed by GitHub
parent 277d6e4946
commit 4485efae95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 531 additions and 203 deletions

View File

@ -19,8 +19,8 @@
-behaviour(hg_machine).
-export([namespace /0]).
-export([init /2]).
-export([process_signal/3]).
-export([process_call /3]).
-export([process_signal/2]).
-export([process_call /2]).
%% Event provider callbacks
@ -189,9 +189,9 @@ publish_event(CustomerID, Changes) when is_list(Changes) ->
namespace() ->
?NS.
-spec init(customer_id(), customer_params()) ->
-spec init(customer_params(), hg_machine:machine()) ->
hg_machine:result().
init(CustomerID, CustomerParams) ->
init(CustomerParams, #{id := CustomerID}) ->
handle_result(#{
changes => [
get_customer_created_event(CustomerID, CustomerParams)
@ -199,9 +199,9 @@ init(CustomerID, CustomerParams) ->
auxst => #{}
}).
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
hg_machine:result().
process_signal(Signal, History, AuxSt) ->
process_signal(Signal, #{history := History, aux_state := AuxSt}) ->
handle_result(handle_signal(Signal, collapse_history(unmarshal(History)), unmarshal(auxst, AuxSt))).
handle_signal(timeout, St0, AuxSt0) ->
@ -256,9 +256,9 @@ is_binding_succeeded(_) ->
{start_binding, binding_params()} |
delete.
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_call(call(), hg_machine:machine()) ->
{hg_machine:response(), hg_machine:result()}.
process_call(Call, History, _AuxSt) ->
process_call(Call, #{history := History}) ->
St = collapse_history(unmarshal(History)),
try handle_result(handle_call(Call, St)) catch
throw:Exception ->

View File

@ -38,8 +38,8 @@
-export([namespace/0]).
-export([init/2]).
-export([process_signal/3]).
-export([process_call/3]).
-export([process_signal/2]).
-export([process_call/2]).
%% Event provider callbacks
@ -402,10 +402,10 @@ publish_event(InvoiceID, Changes) ->
namespace() ->
?NS.
-spec init(invoice_id(), [invoice_tpl_id() | invoice_params()]) ->
-spec init([invoice_tpl_id() | invoice_params()], hg_machine:machine()) ->
hg_machine:result().
init(ID, [InvoiceTplID, PartyRevision, InvoiceParams]) ->
init([InvoiceTplID, PartyRevision, InvoiceParams], #{id := ID}) ->
Invoice = create_invoice(ID, InvoiceTplID, PartyRevision, InvoiceParams),
% TODO ugly, better to roll state and events simultaneously, hg_party-like
handle_result(#{
@ -416,10 +416,10 @@ init(ID, [InvoiceTplID, PartyRevision, InvoiceParams]) ->
%%
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
hg_machine:result().
process_signal(Signal, History, _AuxSt) ->
process_signal(Signal, #{history := History}) ->
handle_result(handle_signal(Signal, collapse_history(unmarshal(History)))).
handle_signal(timeout, St = #st{activity = {payment, PaymentID}}) ->
@ -475,10 +475,10 @@ handle_expiration(St) ->
{rescind, binary()} |
{callback, callback()}.
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_call(call(), hg_machine:machine()) ->
{hg_machine:response(), hg_machine:result()}.
process_call(Call, History, _AuxSt) ->
process_call(Call, #{history := History}) ->
St = collapse_history(unmarshal(History)),
try handle_result(handle_call(Call, St)) catch
throw:Exception ->

View File

@ -17,8 +17,8 @@
-export([namespace/0]).
-export([init/2]).
-export([process_signal/3]).
-export([process_call/3]).
-export([process_signal/2]).
-export([process_call/2]).
%% Event provider callbacks
@ -216,10 +216,10 @@ assert_invoice_template_not_deleted(_) ->
namespace() ->
?NS.
-spec init(tpl_id(), create_params()) ->
-spec init(create_params(), hg_machine:machine()) ->
hg_machine:result().
init(ID, Params) ->
init(Params, #{id := ID}) ->
Tpl = create_invoice_template(ID, Params),
#{events => [marshal([?tpl_created(Tpl)])]}.
@ -235,19 +235,19 @@ create_invoice_template(ID, P) ->
context = P#payproc_InvoiceTemplateCreateParams.context
}.
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
hg_machine:result().
process_signal(timeout, _History, _AuxSt) ->
process_signal(timeout, _Machine) ->
#{};
process_signal({repair, _}, _History, _AuxSt) ->
process_signal({repair, _}, _Machine) ->
#{}.
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_call(call(), hg_machine:machine()) ->
{hg_machine:response(), hg_machine:result()}.
process_call(Call, History, _AuxSt) ->
process_call(Call, #{history := History}) ->
Tpl = collapse_history(unmarshal(History)),
{Response, Changes} = handle_call(Call, Tpl),
{{ok, Response}, #{events => [marshal(Changes)]}}.

View File

@ -1,6 +1,6 @@
-module(hg_machine).
-type msgp() :: hg_msgpack_marshalling:value().
-type msgp() :: hg_msgpack_marshalling:msgpack_value().
-type id() :: mg_proto_base_thrift:'ID'().
-type tag() :: {tag, mg_proto_base_thrift:'Tag'()}.
@ -8,8 +8,6 @@
-type ns() :: mg_proto_base_thrift:'Namespace'().
-type args() :: _.
-type machine() :: mg_proto_state_processing_thrift:'Machine'().
-type event() :: {event_id(), timestamp(), event_payload()}.
-type event_id() :: mg_proto_base_thrift:'EventID'().
-type event_payload() :: msgp().
@ -21,6 +19,12 @@
-type direction() :: mg_proto_state_processing_thrift:'Direction'().
-type descriptor() :: mg_proto_state_processing_thrift:'MachineDescriptor'().
-type machine() :: #{
id := id(),
history := history(),
aux_state := auxst()
}.
-type result() :: #{
events => [event_payload()],
action => hg_machine_action:t(),
@ -30,19 +34,19 @@
-callback namespace() ->
ns().
-callback init(id(), args()) ->
-callback init(args(), machine()) ->
result().
-type signal() ::
timeout | {repair, args()}.
-callback process_signal(signal(), history(), auxst()) ->
-callback process_signal(signal(), machine()) ->
result().
-type call() :: _.
-type response() :: ok | {ok, term()} | {exception, term()}.
-callback process_call(call(), history(), auxst()) ->
-callback process_call(call(), machine()) ->
{response(), result()}.
-type context() :: #{
@ -62,13 +66,16 @@
-export_type([result/0]).
-export_type([context/0]).
-export_type([response/0]).
-export_type([machine/0]).
-export([start/3]).
-export([call/3]).
-export([call/6]).
-export([repair/3]).
-export([get_history/2]).
-export([get_history/4]).
-export([get_history/5]).
-export([get_machine/5]).
%% Dispatch
@ -98,11 +105,29 @@
start(Ns, ID, Args) ->
call_automaton('Start', [Ns, ID, wrap_args(Args)]).
-spec call(ns(), ref(), term()) ->
-spec call(ns(), ref(), Args :: term()) ->
{ok, term()} | {error, notfound | failed} | no_return().
call(Ns, Ref, Args) ->
Descriptor = prepare_descriptor(Ns, Ref, #'HistoryRange'{}),
call(Ns, Ref, Args, undefined, undefined, forward).
-spec call(
ns(),
ref(),
Args :: term(),
After :: event_id() | undefined,
Limit :: integer() | undefined,
Direction :: forward | backward
) ->
{ok, term()} | {error, notfound | failed} | no_return().
call(Ns, Ref, Args, After, Limit, Direction) ->
HistoryRange = #'HistoryRange'{
'after' = After,
'limit' = Limit,
'direction' = Direction
},
Descriptor = prepare_descriptor(Ns, Ref, HistoryRange),
case call_automaton('Call', [Descriptor, wrap_args(Args)]) of
{ok, Response} ->
% should be specific to a processing interface already
@ -122,25 +147,34 @@ repair(Ns, Ref, Args) ->
{ok, history()} | {error, notfound} | no_return().
get_history(Ns, Ref) ->
get_history(Ns, Ref, #'HistoryRange'{}).
get_history(Ns, Ref, undefined, undefined, forward).
-spec get_history(ns(), ref(), undefined | event_id(), undefined | non_neg_integer()) ->
{ok, history()} | {error, notfound} | no_return().
get_history(Ns, Ref, AfterID, Limit) ->
get_history(Ns, Ref, #'HistoryRange'{'after' = AfterID, limit = Limit}).
get_history(Ns, Ref, AfterID, Limit, forward).
-spec get_history(ns(), ref(), undefined | event_id(), undefined | non_neg_integer(), undefined | direction()) ->
-spec get_history(ns(), ref(), undefined | event_id(), undefined | non_neg_integer(), direction()) ->
{ok, history()} | {error, notfound} | no_return().
get_history(Ns, Ref, AfterID, Limit, Direction) ->
get_history(Ns, Ref, #'HistoryRange'{'after' = AfterID, limit = Limit, direction = Direction}).
case get_machine(Ns, Ref, AfterID, Limit, Direction) of
{ok, #{history := History}} ->
{ok, History};
Error ->
Error
end.
get_history(Ns, Ref, Range) ->
-spec get_machine(ns(), ref(), undefined | event_id(), undefined | non_neg_integer(), direction()) ->
{ok, machine()} | {error, notfound} | no_return().
get_machine(Ns, Ref, AfterID, Limit, Direction) ->
Range = #'HistoryRange'{'after' = AfterID, limit = Limit, direction = Direction},
Descriptor = prepare_descriptor(Ns, Ref, Range),
case call_automaton('GetMachine', [Descriptor]) of
{ok, #'Machine'{history = History}} when is_list(History) ->
{ok, unmarshal_events(History)};
{ok, #'Machine'{} = Machine} ->
{ok, unmarshal_machine(Machine)};
Error ->
Error
end.
@ -183,7 +217,7 @@ handle_function_('ProcessSignal', [Args], #{ns := Ns} = _Opts) ->
activity => signal,
signal => Type
}),
dispatch_signal(Ns, Signal, Machine);
dispatch_signal(Ns, Signal, unmarshal_machine(Machine));
handle_function_('ProcessCall', [Args], #{ns := Ns} = _Opts) ->
#'CallArgs'{arg = Payload, machine = #'Machine'{id = ID} = Machine} = Args,
@ -192,7 +226,7 @@ handle_function_('ProcessCall', [Args], #{ns := Ns} = _Opts) ->
id => ID,
activity => call
}),
dispatch_call(Ns, Payload, Machine).
dispatch_call(Ns, Payload, unmarshal_machine(Machine)).
%%
@ -205,31 +239,27 @@ handle_function_('ProcessCall', [Args], #{ns := Ns} = _Opts) ->
Result ::
mg_proto_state_processing_thrift:'SignalResult'().
dispatch_signal(Ns, #'InitSignal'{arg = Payload}, #'Machine'{id = ID}) ->
dispatch_signal(Ns, #'InitSignal'{arg = Payload}, Machine) ->
Args = unwrap_args(Payload),
_ = lager:debug("dispatch init with id = ~s and args = ~p", [ID, Args]),
_ = log_dispatch(init, Args, Machine),
Module = get_handler_module(Ns),
Result = Module:init(ID, Args),
marshal_signal_result(Result, undefined);
Result = Module:init(Args, Machine),
marshal_signal_result(Result, Machine);
dispatch_signal(Ns, #'TimeoutSignal'{}, #'Machine'{history = History0, aux_state = AuxSt0}) ->
History = unmarshal_events(History0),
AuxSt = unmarshal_aux_st(AuxSt0),
_ = lager:debug("dispatch timeout with history = ~p, aux state = ~p", [History, AuxSt]),
dispatch_signal(Ns, #'TimeoutSignal'{}, Machine) ->
_ = log_dispatch(timeout, Machine),
Module = get_handler_module(Ns),
Result = Module:process_signal(timeout, History, AuxSt),
marshal_signal_result(Result, AuxSt);
Result = Module:process_signal(timeout, Machine),
marshal_signal_result(Result, Machine);
dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, #'Machine'{history = History0, aux_state = AuxSt0}) ->
dispatch_signal(Ns, #'RepairSignal'{arg = Payload}, Machine) ->
Args = unwrap_args(Payload),
History = unmarshal_events(History0),
AuxSt = unmarshal_aux_st(AuxSt0),
_ = lager:debug("dispatch repair with args = ~p, history = ~p, aux state = ~p", [Args, History, AuxSt]),
_ = log_dispatch(repair, Args, Machine),
Module = get_handler_module(Ns),
Result = Module:process_signal({repair, Args}, History, AuxSt),
marshal_signal_result(Result, AuxSt).
Result = Module:process_signal({repair, Args}, Machine),
marshal_signal_result(Result, Machine).
marshal_signal_result(Result = #{}, AuxStWas) ->
marshal_signal_result(Result = #{}, #{aux_state := AuxStWas}) ->
_ = lager:debug("signal result = ~p", [Result]),
Change = #'MachineStateChange'{
events = marshal_events(maps:get(events, Result, [])),
@ -245,16 +275,14 @@ marshal_signal_result(Result = #{}, AuxStWas) ->
Call :: mg_proto_state_processing_thrift:'Args'(),
Result :: mg_proto_state_processing_thrift:'CallResult'().
dispatch_call(Ns, Payload, #'Machine'{history = History0, aux_state = AuxSt0}) ->
dispatch_call(Ns, Payload, Machine) ->
Args = unwrap_args(Payload),
History = unmarshal_events(History0),
AuxSt = unmarshal_aux_st(AuxSt0),
_ = lager:debug("dispatch call with args = ~p, history = ~p, aux state = ~p", [Args, History, AuxSt]),
_ = log_dispatch(call, Args, Machine),
Module = get_handler_module(Ns),
Result = Module:process_call(Args, History, AuxSt),
marshal_call_result(Result, AuxSt).
Result = Module:process_call(Args, Machine),
marshal_call_result(Result, Machine).
marshal_call_result({Response, Result}, AuxStWas) ->
marshal_call_result({Response, Result}, #{aux_state := AuxStWas}) ->
_ = lager:debug("call response = ~p with result = ~p", [Response, Result]),
Change = #'MachineStateChange'{
events = marshal_events(maps:get(events, Result, [])),
@ -318,6 +346,25 @@ init(MachineHandlers) ->
get_handler_module(Ns) ->
ets:lookup_element(?TABLE, Ns, 2).
log_dispatch(Operation, #{id := ID, history := History, aux_state := AuxSt}) ->
lager:debug(
"dispatch ~p with id = ~p, history = ~p, aux state = ~p",
[Operation, ID, History, AuxSt]
).
log_dispatch(Operation, Args, #{id := ID, history := History, aux_state := AuxSt}) ->
lager:debug(
"dispatch ~p with id = ~p, args = ~p, history = ~p, aux state = ~p",
[Operation, ID, Args, History, AuxSt]
).
unmarshal_machine(#'Machine'{id = ID, history = History, aux_state = AuxSt}) ->
#{
id => ID,
history => unmarshal_events(History),
aux_state => unmarshal_aux_st(AuxSt)
}.
marshal_events(Events) when is_list(Events) ->
[hg_msgpack_marshalling:marshal(Event) || Event <- Events].

View File

@ -10,8 +10,8 @@
-export([namespace/0]).
-export([init/2]).
-export([process_signal/3]).
-export([process_call/3]).
-export([process_signal/2]).
-export([process_call/2]).
%% Event provider callbacks
@ -36,14 +36,16 @@
-define(NS, <<"party">>).
-define(STEP, 5).
-define(SNAPSHOT_STEP, 10).
-define(CT_ERLANG_BINARY, <<"application/x-erlang-binary">>).
-record(st, {
party :: undefined | party(),
timestamp :: undefined | timestamp(),
revision :: hg_domain:revision(),
claims = #{} :: #{claim_id() => claim()},
meta = #{} :: meta(),
migration_data = #{} :: #{any() => any()}
migration_data = #{} :: #{any() => any()},
last_event = 0 :: event_id()
}).
-type st() :: #st{}.
@ -77,6 +79,21 @@
-type meta_data() :: dmsl_domain_thrift:'PartyMetaData'().
-type party_revision_param() :: dmsl_payment_processing_thrift:'PartyRevisionParam'().
-type party_revision() :: dmsl_domain_thrift:'PartyRevision'().
-type event_id() :: non_neg_integer().
-type content_type() :: binary().
-type party_aux_st() :: #{
snapshot_index := snapshot_index(),
party_revision_index := party_revision_index()
}.
-type snapshot_index() :: [event_id()].
-type party_revision_index() :: #{
party_revision() => event_range()
}.
-type event_range() :: {
FromEventID :: event_id() | undefined,
ToEventID :: event_id() | undefined
}.
-export_type([party_revision/0]).
@ -86,10 +103,10 @@
namespace() ->
?NS.
-spec init(party_id(), dmsl_payment_processing_thrift:'PartyParams'()) ->
-spec init(dmsl_payment_processing_thrift:'PartyParams'(), hg_machine:machine()) ->
hg_machine:result().
init(ID, PartyParams) ->
init(PartyParams, #{id := ID}) ->
scoper:scope(
party,
#{
@ -101,31 +118,40 @@ init(ID, PartyParams) ->
process_init(PartyID, #payproc_PartyParams{contact_info = ContactInfo}) ->
Timestamp = hg_datetime:format_now(),
ok([?party_created(PartyID, ContactInfo, Timestamp), ?revision_changed(Timestamp, 0)]).
Changes = [?party_created(PartyID, ContactInfo, Timestamp), ?revision_changed(Timestamp, 0)],
#{
events => [wrap_event_payload(?party_ev(Changes))],
auxst => wrap_aux_state(#{
snapshot_index => [],
party_revision_index => #{}
})
}.
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
hg_machine:result().
process_signal(timeout, _History, _AuxSt) ->
process_signal(timeout, _Machine) ->
#{};
process_signal({repair, _}, _History, _AuxSt) ->
process_signal({repair, _}, _Machine) ->
#{}.
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_call(call(), hg_machine:machine()) ->
{hg_machine:response(), hg_machine:result()}.
process_call(Call, History, _AuxSt) ->
St = collapse_history(unwrap_events(History)),
process_call(Call, #{id := PartyID, history := History, aux_state := WrappedAuxSt}) ->
try
Party = get_st_party(St),
scoper:scope(
party,
#{
id => Party#domain_Party.id,
id => PartyID,
activity => get_call_name(Call)
},
fun() -> handle_call(Call, {St, []}) end
fun() ->
AuxSt0 = unwrap_aux_state(WrappedAuxSt),
{St, AuxSt1} = get_state_for_call(PartyID, History, AuxSt0),
handle_call(Call, AuxSt1, St)
end
)
catch
throw:Exception ->
@ -137,83 +163,128 @@ get_call_name(Call) when is_tuple(Call) ->
get_call_name(Call) when is_atom(Call) ->
Call.
handle_call({block, Target, Reason}, {St, _}) ->
handle_call({block, Target, Reason}, AuxSt, St) ->
ok = assert_unblocked(Target, St),
Timestamp = hg_datetime:format_now(),
Revision = get_next_party_revision(St),
respond(ok, [
block(Target, Reason, Timestamp),
?revision_changed(Timestamp, Revision)
]);
respond(
ok,
[block(Target, Reason, Timestamp), ?revision_changed(Timestamp, Revision)],
AuxSt,
St
);
handle_call({unblock, Target, Reason}, {St, _}) ->
handle_call({unblock, Target, Reason}, AuxSt, St) ->
ok = assert_blocked(Target, St),
Timestamp = hg_datetime:format_now(),
Revision = get_next_party_revision(St),
respond(ok, [
unblock(Target, Reason, Timestamp),
?revision_changed(Timestamp, Revision)
]);
respond(
ok,
[unblock(Target, Reason, Timestamp), ?revision_changed(Timestamp, Revision)],
AuxSt,
St
);
handle_call({suspend, Target}, {St, _}) ->
handle_call({suspend, Target}, AuxSt, St) ->
ok = assert_unblocked(Target, St),
ok = assert_active(Target, St),
Timestamp = hg_datetime:format_now(),
Revision = get_next_party_revision(St),
respond(ok, [
suspend(Target, Timestamp),
?revision_changed(Timestamp, Revision)
]);
respond(
ok,
[suspend(Target, Timestamp), ?revision_changed(Timestamp, Revision)],
AuxSt,
St
);
handle_call({activate, Target}, {St, _}) ->
handle_call({activate, Target}, AuxSt, St) ->
ok = assert_unblocked(Target, St),
ok = assert_suspended(Target, St),
Timestamp = hg_datetime:format_now(),
Revision = get_next_party_revision(St),
respond(ok, [
activate(Target, Timestamp),
?revision_changed(Timestamp, Revision)
]);
respond(
ok,
[activate(Target, Timestamp), ?revision_changed(Timestamp, Revision)],
AuxSt,
St
);
handle_call({set_metadata, NS, Data}, _) ->
respond(ok, [?party_meta_set(NS, Data)]);
handle_call({set_metadata, NS, Data}, AuxSt, St) ->
respond(
ok,
[?party_meta_set(NS, Data)],
AuxSt,
St
);
handle_call({remove_metadata, NS}, {St, _}) ->
handle_call({remove_metadata, NS}, AuxSt, St) ->
_ = get_st_metadata(NS, St),
respond(ok, [?party_meta_removed(NS)]);
respond(
ok,
[?party_meta_removed(NS)],
AuxSt,
St
);
handle_call({create_claim, Changeset}, {St, _}) ->
handle_call({create_claim, Changeset}, AuxSt, St) ->
ok = assert_party_operable(St),
{Claim, Changes} = create_claim(Changeset, St),
respond(Claim, Changes);
respond(
Claim,
Changes,
AuxSt,
St
);
handle_call({update_claim, ID, ClaimRevision, Changeset}, {St, _}) ->
handle_call({update_claim, ID, ClaimRevision, Changeset}, AuxSt, St) ->
ok = assert_party_operable(St),
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
respond(ok, update_claim(ID, Changeset, St));
respond(
ok,
update_claim(ID, Changeset, St),
AuxSt,
St
);
handle_call({accept_claim, ID, ClaimRevision}, {St, _}) ->
handle_call({accept_claim, ID, ClaimRevision}, AuxSt, St) ->
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
Timestamp = hg_datetime:format_now(),
Revision = get_next_party_revision(St),
Claim = hg_claim:accept(
Timestamp,
get_st_revision(St),
hg_domain:head(),
get_st_party(St),
get_st_claim(ID, St)
),
respond(ok, [finalize_claim(Claim, Timestamp), ?revision_changed(Timestamp, Revision)]);
respond(
ok,
[finalize_claim(Claim, Timestamp), ?revision_changed(Timestamp, Revision)],
AuxSt,
St
);
handle_call({deny_claim, ID, ClaimRevision, Reason}, {St, _}) ->
handle_call({deny_claim, ID, ClaimRevision, Reason}, AuxSt, St) ->
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
Claim = hg_claim:deny(Reason, get_st_timestamp(St), get_st_claim(ID, St)),
respond(ok, [finalize_claim(Claim, get_st_timestamp(St))]);
Timestamp = hg_datetime:format_now(),
Claim = hg_claim:deny(Reason, Timestamp, get_st_claim(ID, St)),
respond(
ok,
[finalize_claim(Claim, Timestamp)],
AuxSt,
St
);
handle_call({revoke_claim, ID, ClaimRevision, Reason}, {St, _}) ->
handle_call({revoke_claim, ID, ClaimRevision, Reason}, AuxSt, St) ->
ok = assert_party_operable(St),
ok = assert_claim_modification_allowed(ID, ClaimRevision, St),
Claim = hg_claim:revoke(Reason, get_st_timestamp(St), get_st_claim(ID, St)),
respond(ok, [finalize_claim(Claim, get_st_timestamp(St))]).
Timestamp = hg_datetime:format_now(),
Claim = hg_claim:revoke(Reason, Timestamp, get_st_claim(ID, St)),
respond(
ok,
[finalize_claim(Claim, Timestamp)],
AuxSt,
St
).
publish_party_event(Source, {ID, Dt, Ev = ?party_ev(_)}) ->
#payproc_Event{id = ID, source = Source, created_at = Dt, payload = Ev}.
@ -224,7 +295,7 @@ publish_party_event(Source, {ID, Dt, Ev = ?party_ev(_)}) ->
hg_event_provider:public_event().
publish_event(PartyID, Ev) ->
{{party_id, PartyID}, unwrap_event(Ev)}.
{{party_id, PartyID}, unwrap_event_payload(Ev)}.
%%
-spec start(party_id(), Args :: term()) ->
@ -244,16 +315,66 @@ start(PartyID, Args) ->
get_party(PartyID) ->
get_st_party(get_state(PartyID)).
get_state(PartyID) ->
AuxSt = get_aux_state(PartyID),
get_state(PartyID, get_snapshot_index(AuxSt)).
get_state(PartyID, []) ->
%% No snapshots, so we need entire history
Events = lists:map(fun unwrap_event/1, get_history(PartyID, undefined, undefined, forward)),
merge_events(Events, #st{});
get_state(PartyID, [FirstID | _]) ->
History = get_history(PartyID, FirstID - 1, undefined, forward),
Events = lists:map(fun unwrap_event/1, History),
[FirstEvent | _] = History,
St = unwrap_state(FirstEvent),
merge_events(Events, St).
get_state_for_call(PartyID, ReversedHistoryPart, AuxSt) ->
{St, History} = parse_history(ReversedHistoryPart),
get_state_for_call(PartyID, {St, History}, [], AuxSt).
get_state_for_call(PartyID, {undefined, [{FirstID, _, _} | _] = Events}, EventsAcc, AuxSt)
when FirstID > 1
->
Limit = get_limit(FirstID, get_snapshot_index(AuxSt)),
NewHistoryPart = parse_history(get_history(PartyID, FirstID, Limit, backward)),
get_state_for_call(PartyID, NewHistoryPart, Events ++ EventsAcc, AuxSt);
get_state_for_call(_, {St0, Events}, EventsAcc, AuxSt0) ->
%% here we can get entire history.
%% we can use it to create revision index for AuxSt
PartyRevisionIndex0 = get_party_revision_index(AuxSt0),
{St1, PartyRevisionIndex1} = build_revision_index(
Events ++ EventsAcc,
PartyRevisionIndex0,
hg_utils:select_defined(St0, #st{})
),
AuxSt1 = set_party_revision_index(PartyRevisionIndex1, AuxSt0),
{St1, AuxSt1}.
parse_history(ReversedHistoryPart) ->
parse_history(ReversedHistoryPart, []).
parse_history([WrappedEvent | Others], EventsAcc) ->
Event = unwrap_event(WrappedEvent),
case unwrap_state(WrappedEvent) of
undefined ->
parse_history(Others, [Event | EventsAcc]);
#st{} = St ->
{St, [Event | EventsAcc]}
end;
parse_history([], EventsAcc) ->
{undefined, EventsAcc}.
-spec checkout(party_id(), party_revision_param()) ->
dmsl_domain_thrift:'Party'() | no_return().
checkout(PartyID, RevisionParam) ->
case checkout_history(get_history(PartyID), RevisionParam) of
{ok, St} ->
get_st_party(St);
{error, Reason} ->
error(Reason)
end.
get_st_party(
hg_utils:unwrap_result(
checkout_party(PartyID, RevisionParam)
)
).
-spec get_last_revision(party_id()) ->
party_revision() | no_return().
@ -266,7 +387,14 @@ get_last_revision(PartyID) ->
term() | no_return().
call(PartyID, Call) ->
map_error(hg_machine:call(?NS, PartyID, Call)).
map_error(hg_machine:call(
?NS,
PartyID,
Call,
undefined,
?SNAPSHOT_STEP,
backward
)).
map_error({ok, CallResult}) ->
case CallResult of
@ -310,16 +438,24 @@ get_metadata(NS, PartyID) ->
[dmsl_payment_processing_thrift:'Event'()].
get_public_history(PartyID, AfterID, Limit) ->
[publish_party_event({party_id, PartyID}, Ev) || Ev <- get_history(PartyID, AfterID, Limit)].
get_state(PartyID) ->
collapse_history(get_history(PartyID)).
get_history(PartyID) ->
map_history_error(hg_machine:get_history(?NS, PartyID)).
Events = unwrap_events(get_history(PartyID, AfterID, Limit)),
[publish_party_event({party_id, PartyID}, Ev) || Ev <- Events].
get_history(PartyID, AfterID, Limit) ->
map_history_error(hg_machine:get_history(?NS, PartyID, AfterID, Limit)).
get_history(PartyID, AfterID, Limit, forward).
get_history(PartyID, AfterID, Limit, Direction) ->
map_history_error(hg_machine:get_history(?NS, PartyID, AfterID, Limit, Direction)).
get_aux_state(PartyID) ->
#{aux_state := AuxSt} = map_history_error(hg_machine:get_machine(
?NS,
PartyID,
undefined,
0,
backward
)),
unwrap_aux_state(AuxSt).
get_revision_of_part(PartyID, History, Last, Step) ->
case find_revision_in_history(History) of
@ -333,7 +469,7 @@ get_revision_of_part(PartyID, History, Last, Step) ->
end.
get_history_part(PartyID, Last, Step) ->
case map_history_error(hg_machine:get_history(?NS, PartyID, Last, Step, backward)) of
case unwrap_events(get_history(PartyID, Last, Step, backward)) of
[] ->
{[], 0, 0};
History ->
@ -362,7 +498,7 @@ find_revision_in_changes([Event | Rest]) ->
end.
map_history_error({ok, Result}) ->
unwrap_events(Result);
Result;
map_history_error({error, notfound}) ->
throw(#payproc_PartyNotFound{}).
@ -388,18 +524,6 @@ get_st_pending_claims(#st{claims = Claims})->
Claims
)).
-spec get_st_timestamp(st()) ->
timestamp().
get_st_timestamp(#st{timestamp = Timestamp}) ->
Timestamp.
-spec get_st_revision(st()) ->
hg_domain:revision().
get_st_revision(#st{revision = Revision}) ->
Revision.
-spec get_st_metadata(meta_ns(), st()) ->
meta_data().
@ -427,10 +551,7 @@ assert_claim_modification_allowed(ID, Revision, St) ->
ok = hg_claim:assert_revision(Claim, Revision),
ok = hg_claim:assert_pending(Claim).
assert_claims_not_conflict(Claim, ClaimsPending, St) ->
Timestamp = get_st_timestamp(St),
Revision = get_st_revision(St),
Party = get_st_party(St),
assert_claims_not_conflict(Claim, ClaimsPending, Timestamp, Revision, Party) ->
ConflictedClaims = lists:dropwhile(
fun(PendingClaim) ->
hg_claim:get_id(Claim) =:= hg_claim:get_id(PendingClaim) orelse
@ -448,13 +569,13 @@ assert_claims_not_conflict(Claim, ClaimsPending, St) ->
%%
create_claim(Changeset, St) ->
Timestamp = get_st_timestamp(St),
Revision = get_st_revision(St),
Timestamp = hg_datetime:format_now(),
Revision = hg_domain:head(),
Party = get_st_party(St),
Claim = hg_claim:create(get_next_claim_id(St), Changeset, Party, Timestamp, Revision),
ClaimsPending = get_st_pending_claims(St),
% Check for conflicts with other pending claims
ok = assert_claims_not_conflict(Claim, ClaimsPending, St),
ok = assert_claims_not_conflict(Claim, ClaimsPending, Timestamp, Revision, Party),
% Test if we can safely accept proposed changes.
case hg_claim:is_need_acceptance(Claim, Party, Revision) of
false ->
@ -480,16 +601,18 @@ create_claim(Changeset, St) ->
end.
update_claim(ID, Changeset, St) ->
Timestamp = get_st_timestamp(St),
Timestamp = hg_datetime:format_now(),
Revision = hg_domain:head(),
Party = get_st_party(St),
Claim = hg_claim:update(
Changeset,
get_st_claim(ID, St),
get_st_party(St),
Party,
Timestamp,
get_st_revision(St)
Revision
),
ClaimsPending = get_st_pending_claims(St),
ok = assert_claims_not_conflict(Claim, ClaimsPending, St),
ok = assert_claims_not_conflict(Claim, ClaimsPending, Timestamp, Revision, Party),
[?claim_updated(ID, Changeset, hg_claim:get_revision(Claim), Timestamp)].
finalize_claim(Claim, Timestamp) ->
@ -507,38 +630,90 @@ get_next_claim_id(#st{claims = Claims}) ->
apply_accepted_claim(Claim, St) ->
case hg_claim:is_accepted(Claim) of
true ->
Party = hg_claim:apply(Claim, get_st_timestamp(St), get_st_party(St)),
Party = hg_claim:apply(Claim, hg_datetime:format_now(), get_st_party(St)),
St#st{party = Party};
false ->
St
end.
ok(Changes) ->
#{events => wrap_events([?party_ev(Changes)])}.
respond(Response, Changes) ->
{{ok, Response}, #{events => wrap_events([?party_ev(Changes)])}}.
respond(Response, Changes, AuxSt0, St) ->
AuxSt1 = append_party_revision_index(St, AuxSt0),
{Events, AuxSt2} = try_attach_snapshot(Changes, AuxSt1, St),
{
{ok, Response},
#{
events => Events,
auxst => AuxSt2
}
}.
respond_w_exception(Exception) ->
{{exception, Exception}, #{}}.
append_party_revision_index(St, AuxSt) ->
PartyRevisionIndex0 = get_party_revision_index(AuxSt),
PartyRevisionIndex1 = update_party_revision_index(St, PartyRevisionIndex0),
set_party_revision_index(PartyRevisionIndex1, AuxSt).
update_party_revision_index(St, PartyRevisionIndex) ->
#domain_Party{revision = PartyRevision} = get_st_party(St),
EventID = St#st.last_event,
{FromEventID, ToEventID} = get_party_revision_range(PartyRevision, PartyRevisionIndex),
PartyRevisionIndex#{
PartyRevision => {
hg_utils:select_defined(FromEventID, EventID),
max(hg_utils:select_defined(ToEventID, EventID), EventID)
}
}.
get_party_revision_index(AuxSt) ->
maps:get(party_revision_index, AuxSt, #{}).
set_party_revision_index(PartyRevisionIndex, AuxSt) ->
AuxSt#{party_revision_index => PartyRevisionIndex}.
get_party_revision_range(PartyRevision, PartyRevisionIndex) ->
maps:get(PartyRevision, PartyRevisionIndex, {undefined, undefined}).
%% TODO crunch func, will be removed after a short (or not so short) time
build_revision_index([Event | History], PartyRevisionIndex0, St0) ->
St1 = merge_event(Event, St0),
PartyRevisionIndex1 = update_party_revision_index(St1, PartyRevisionIndex0),
build_revision_index(History, PartyRevisionIndex1, St1);
build_revision_index([], PartyRevisionIndex, St) ->
{St, PartyRevisionIndex}.
append_snapshot_index(EventID, AuxSt) ->
SnapshotIndex = get_snapshot_index(AuxSt),
set_snapshot_index([EventID | SnapshotIndex], AuxSt).
get_snapshot_index(AuxSt) ->
maps:get(snapshot_index, AuxSt, []).
set_snapshot_index(SnapshotIndex, AuxSt) ->
AuxSt#{snapshot_index => SnapshotIndex}.
get_limit(undefined, _) ->
%% we can't get any reasonable limit in this case
undefined;
get_limit(ToEventID, [SnapshotEventID | _]) when SnapshotEventID < ToEventID ->
ToEventID - SnapshotEventID;
get_limit(ToEventID, [_ | SnapshotIndex]) ->
get_limit(ToEventID, SnapshotIndex);
get_limit(_ToEventID, []) ->
undefined.
%%
-spec collapse_history(hg_machine:history()) -> st().
-spec checkout_party(party_id(), party_revision_param()) -> {ok, st()} | {error, revision_not_found}.
collapse_history(History) ->
{ok, St} = checkout_history(History, {timestamp, hg_datetime:format_now()}),
St.
checkout_party(PartyID, {timestamp, Timestamp}) ->
Events = unwrap_events(get_history(PartyID, undefined, undefined)),
checkout_history_by_timestamp(Events, Timestamp, #st{});
checkout_party(PartyID, {revision, Revision}) ->
checkout_party_by_revision(PartyID, Revision).
-spec checkout_history(hg_machine:history(), party_revision_param()) -> {ok, st()} | {error, revision_not_found}.
checkout_history(History, {timestamp, Timestamp}) ->
% FIXME hg_domain:head() looks strange here
checkout_history_by_timestamp(History, Timestamp, #st{revision = hg_domain:head()});
checkout_history(History, {revision, Revision}) ->
checkout_history_by_revision(History, Revision, #st{revision = hg_domain:head()}).
checkout_history_by_timestamp([{_, _, Ev} | Rest], Timestamp, #st{timestamp = PrevTimestamp} = St) ->
checkout_history_by_timestamp([Ev | Rest], Timestamp, #st{timestamp = PrevTimestamp} = St) ->
St1 = merge_event(Ev, St),
EventTimestamp = St1#st.timestamp,
case hg_datetime:compare(EventTimestamp, Timestamp) of
@ -552,7 +727,24 @@ checkout_history_by_timestamp([{_, _, Ev} | Rest], Timestamp, #st{timestamp = Pr
checkout_history_by_timestamp([], Timestamp, St) ->
{ok, St#st{timestamp = Timestamp}}.
checkout_history_by_revision([{_, _, Ev} | Rest], Revision, St) ->
checkout_party_by_revision(PartyID, Revision) ->
AuxSt = get_aux_state(PartyID),
FromEventID = case get_party_revision_range(Revision, get_party_revision_index(AuxSt)) of
{_, undefined} ->
undefined;
{_, EventID} ->
EventID + 1
end,
Limit = get_limit(FromEventID, get_snapshot_index(AuxSt)),
ReversedHistory = get_history(PartyID, FromEventID, Limit, backward),
case parse_history(ReversedHistory) of
{undefined, Events} ->
checkout_history_by_revision(Events, Revision, #st{});
{St, Events} ->
checkout_history_by_revision(Events, Revision, St)
end.
checkout_history_by_revision([Ev | Rest], Revision, St) ->
St1 = merge_event(Ev, St),
case get_st_party(St1) of
#domain_Party{revision = Revision1} when Revision1 > Revision ->
@ -568,8 +760,13 @@ checkout_history_by_revision([], Revision, St) ->
{error, revision_not_found}
end.
merge_event(?party_ev(PartyChanges), St) when is_list(PartyChanges) ->
lists:foldl(fun merge_party_change/2, St, PartyChanges).
merge_events(Events, St) ->
lists:foldl(fun merge_event/2, St, Events).
merge_event({ID, _Dt, ?party_ev(PartyChanges)}, #st{last_event = LastEventID} = St)
when is_list(PartyChanges) andalso ID =:= LastEventID + 1
->
lists:foldl(fun merge_party_change/2, St#st{last_event = ID}, PartyChanges).
merge_party_change(?party_created(PartyID, ContactInfo, Timestamp), St) ->
St#st{
@ -856,26 +1053,113 @@ get_template(TemplateRef, Revision) ->
%%
try_attach_snapshot(Changes, AuxSt0, #st{last_event = LastEventID} = St)
when
LastEventID > 0 andalso
LastEventID rem ?SNAPSHOT_STEP =:= 0
->
AuxSt1 = append_snapshot_index(LastEventID + 1, AuxSt0),
{
[wrap_event_payload_w_snapshot(?party_ev(Changes), St)],
wrap_aux_state(AuxSt1)
};
try_attach_snapshot(Changes, AuxSt, _) ->
{
[wrap_event_payload(?party_ev(Changes))],
wrap_aux_state(AuxSt)
}.
%% TODO add transmutations for new international legal entities and bank accounts
-define(TOP_VERSION, 6).
wrap_events(Events) ->
[hg_party_marshalling:marshal([?TOP_VERSION, E]) || E <- Events].
wrap_event_payload(Event) ->
ContentType = ?CT_ERLANG_BINARY,
Meta = #{
<<"vsn">> => ?TOP_VERSION,
<<"ct">> => ContentType
},
Data = encode_event(ContentType, Event),
[Meta, Data].
wrap_event_payload_w_snapshot(Event, St) ->
ContentType = ?CT_ERLANG_BINARY,
Meta = #{
<<"vsn">> => ?TOP_VERSION,
<<"ct">> => ContentType,
<<"state_snapshot">> => encode_state(ContentType, St)
},
Data = encode_event(ContentType, Event),
[Meta, Data].
unwrap_events(History) ->
lists:map(
fun({ID, Dt, Event}) ->
{ID, Dt, unwrap_event(Event)}
end,
History
).
[unwrap_event(E) || E <- History].
unwrap_event(Event) when is_list(Event) ->
unwrap_event({ID, Dt, Event}) ->
{ID, Dt, unwrap_event_payload(Event)}.
unwrap_event_payload([
#{
<<"vsn">> := Version,
<<"ct">> := ContentType
},
EncodedEvent
]) ->
transmute([Version, decode_event(ContentType, EncodedEvent)]);
%% TODO legacy support, will be removed after migration
unwrap_event_payload(Event) when is_list(Event) ->
transmute(hg_party_marshalling:unmarshal(Event));
unwrap_event({bin, Bin}) when is_binary(Bin) ->
unwrap_event_payload({bin, Bin}) when is_binary(Bin) ->
transmute([1, binary_to_term(Bin)]).
unwrap_state({
_ID,
_Dt,
[
#{<<"ct">> := ContentType, <<"state_snapshot">> := EncodedSt},
_EncodedEvent
]
}) ->
decode_state(ContentType, EncodedSt);
unwrap_state(_) ->
undefined.
encode_state(?CT_ERLANG_BINARY, St) ->
{bin, term_to_binary(St)}.
decode_state(?CT_ERLANG_BINARY, {bin, EncodedSt}) ->
binary_to_term(EncodedSt).
encode_event(?CT_ERLANG_BINARY, Event) ->
{bin, term_to_binary(Event)}.
decode_event(?CT_ERLANG_BINARY, {bin, EncodedEvent}) ->
binary_to_term(EncodedEvent).
-spec wrap_aux_state(party_aux_st()) -> hg_msgpack_marshalling:msgpack_value().
wrap_aux_state(AuxSt) ->
ContentType = ?CT_ERLANG_BINARY,
#{<<"ct">> => ContentType, <<"aux_state">> => encode_aux_state(ContentType, AuxSt)}.
-spec unwrap_aux_state(hg_msgpack_marshalling:msgpack_value()) -> party_aux_st().
unwrap_aux_state(#{<<"ct">> := ContentType, <<"aux_state">> := AuxSt}) ->
decode_aux_state(ContentType, AuxSt);
%% backward compatibility
unwrap_aux_state(undefined) ->
#{}.
-spec encode_aux_state(content_type(), party_aux_st()) -> dmsl_msgpack_thrift:'Value'().
encode_aux_state(?CT_ERLANG_BINARY, AuxSt) ->
{bin, term_to_binary(AuxSt)}.
-spec decode_aux_state(content_type(), dmsl_msgpack_thrift:'Value'()) -> party_aux_st().
decode_aux_state(?CT_ERLANG_BINARY, {bin, AuxSt}) ->
binary_to_term(AuxSt).
transmute([Version, Event]) ->
transmute_event(Version, ?TOP_VERSION, Event).

View File

@ -25,8 +25,8 @@
-behaviour(hg_machine).
-export([namespace /0]).
-export([init /2]).
-export([process_signal/3]).
-export([process_call /3]).
-export([process_signal/2]).
-export([process_call /2]).
%% Types
-record(st, {
@ -39,7 +39,6 @@
-type st() :: #st{}.
-export_type([st/0]).
-type rec_payment_tool_id() :: dmsl_payment_processing_thrift:'RecurrentPaymentToolID'().
-type rec_payment_tool() :: dmsl_payment_processing_thrift:'RecurrentPaymentTool'().
-type rec_payment_tool_params() :: dmsl_payment_processing_thrift:'RecurrentPaymentToolParams'().
@ -192,9 +191,9 @@ map_start_error({error, Reason}) ->
namespace() ->
?NS.
-spec init(rec_payment_tool_id(), [payment_tool() | rec_payment_tool_params()]) ->
-spec init([payment_tool() | rec_payment_tool_params()], hg_machine:machine()) ->
hg_machine:result().
init(RecPaymentToolID, [PaymentTool, Params]) ->
init([PaymentTool, Params], #{id := RecPaymentToolID}) ->
Revision = hg_domain:head(),
CreatedAt = hg_datetime:format_now(),
{Party, Shop} = get_party_shop(Params),
@ -280,9 +279,9 @@ start_session() ->
Action = hg_machine_action:instant(),
{ok, {Events, Action}}.
-spec process_signal(hg_machine:signal(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_signal(hg_machine:signal(), hg_machine:machine()) ->
hg_machine:result().
process_signal(Signal, History, _AuxSt) ->
process_signal(Signal, #{history := History}) ->
handle_result(handle_signal(Signal, collapse_history(unmarshal(History)))).
handle_signal(timeout, St) ->
@ -500,9 +499,9 @@ create_session() ->
-type call() :: abandon.
-spec process_call(call(), hg_machine:history(), hg_machine:auxst()) ->
-spec process_call(call(), hg_machine:machine()) ->
{hg_machine:response(), hg_machine:result()}.
process_call(Call, History, _AuxSt) ->
process_call(Call, #{history := History}) ->
St = collapse_history(unmarshal(History)),
try handle_result(handle_call(Call, St)) catch
throw:Exception ->

View File

@ -254,8 +254,6 @@ end_per_suite(C) ->
init_per_group(shop_blocking_suspension, C) ->
C;
init_per_group(wallet_blocking_suspension, C) ->
C;
init_per_group(Group, C) ->
PartyID = list_to_binary(lists:concat([Group, ".", erlang:system_time()])),
ApiClient = hg_ct_helper:create_client(cfg(root_url, C), PartyID),