mirror of
https://github.com/valitydev/machinery-erlang.git
synced 2024-11-06 00:35:19 +00:00
Support range / handle timeout for timeout signals (#15)
This commit is contained in:
parent
ec7968f1f4
commit
6e82ff5ca3
@ -91,11 +91,14 @@
|
||||
}.
|
||||
|
||||
-type action() ::
|
||||
{set_timer, timer()} |
|
||||
unset_timer |
|
||||
continue |
|
||||
{set_timer, timer()} |
|
||||
{set_timer, timer(), range()} |
|
||||
{set_timer, timer(), range(), seconds()} |
|
||||
unset_timer |
|
||||
continue |
|
||||
remove.
|
||||
|
||||
-export_type([timer/0]).
|
||||
-export_type([timestamp/0]).
|
||||
-export_type([seconds/0]).
|
||||
-export_type([result/2]).
|
||||
|
@ -7,11 +7,13 @@
|
||||
|
||||
-type id() :: machinery:id().
|
||||
-type namespace() :: machinery:namespace().
|
||||
-type timer() :: machinery:timer().
|
||||
-type tag() :: binary().
|
||||
|
||||
-export_type([tag/0]).
|
||||
|
||||
-export([tag/4]).
|
||||
-export([tag_until/5]).
|
||||
-export([untag/4]).
|
||||
-export([get/3]).
|
||||
|
||||
@ -43,6 +45,23 @@ tag(NS, Tag, ID, Backend) ->
|
||||
end
|
||||
end.
|
||||
|
||||
-spec tag_until(namespace(), tag(), id(), timer(), machinery:backend(_)) ->
|
||||
ok | {error, {set, id()}}.
|
||||
tag_until(NS, Tag, ID, Timer, Backend) ->
|
||||
case machinery:start(construct_namespace(NS), Tag, {tag, ID, Timer}, Backend) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, exists} ->
|
||||
case get(NS, Tag, Backend) of
|
||||
{ok, ID} ->
|
||||
ok;
|
||||
{ok, IDWas} ->
|
||||
{error, {set, IDWas}};
|
||||
{error, unset} ->
|
||||
tag_until(NS, Tag, ID, Timer, Backend)
|
||||
end
|
||||
end.
|
||||
|
||||
-spec untag(namespace(), tag(), id(), machinery:backend(_)) ->
|
||||
ok | {error, {set, id()}}.
|
||||
untag(NS, Tag, ID, Backend) ->
|
||||
@ -78,21 +97,26 @@ construct_namespace(NS) ->
|
||||
ok | {error, id()}
|
||||
).
|
||||
|
||||
-type ev() ::
|
||||
{tag_set, id()} |
|
||||
tag_unset.
|
||||
-type ev() :: id().
|
||||
|
||||
-spec init({tag, id()}, machine(), undefined, handler_opts()) ->
|
||||
result().
|
||||
init({tag, ID}, _Machine, _, _Opts) ->
|
||||
#{
|
||||
events => [ID]
|
||||
};
|
||||
init({tag, ID, Timer}, _Machine, _, _Opts) ->
|
||||
#{
|
||||
events => [ID],
|
||||
action => [{set_timer, Timer, {undefined, 0, forward}}]
|
||||
}.
|
||||
|
||||
-spec process_timeout(machine(), undefined, handler_opts()) ->
|
||||
result().
|
||||
process_timeout(#{}, _, _Opts) ->
|
||||
#{}.
|
||||
#{
|
||||
action => [remove]
|
||||
}.
|
||||
|
||||
-spec process_call({untag, id()}, machine(), undefined, handler_opts()) ->
|
||||
{response(), result()}.
|
||||
|
@ -4,12 +4,14 @@
|
||||
|
||||
-export([child_spec/1]).
|
||||
-export([tag/4]).
|
||||
-export([tag_until/5]).
|
||||
-export([untag/4]).
|
||||
-export([get/3]).
|
||||
|
||||
-type id() :: machinery:id().
|
||||
-type namespace() :: machinery:namespace().
|
||||
-type tag() :: machinery_machine_unique_tag:tag().
|
||||
-type timer() :: machinery:timer().
|
||||
|
||||
-type opts() :: #{
|
||||
woody_ctx := woody_context:ctx()
|
||||
@ -29,6 +31,11 @@ child_spec(Id) ->
|
||||
tag(NS, Tag, ID, Opts) ->
|
||||
machinery_machine_unique_tag:tag(NS, Tag, ID, get_backend(Opts)).
|
||||
|
||||
-spec tag_until(namespace(), tag(), id(), timer(), opts()) ->
|
||||
ok | {error, {set, id()}}.
|
||||
tag_until(NS, Tag, ID, Timer, Opts) ->
|
||||
machinery_machine_unique_tag:tag_until(NS, Tag, ID, Timer, get_backend(Opts)).
|
||||
|
||||
-spec untag(namespace(), tag(), id(), opts()) ->
|
||||
ok | {error, {set, id()}}.
|
||||
untag(NS, Tag, ID, Opts) ->
|
||||
|
@ -387,6 +387,23 @@ apply_action({set_timer, V}, CA) ->
|
||||
timer = {set_timer, #mg_stateproc_SetTimerAction{timer = marshal(timer, V)}}
|
||||
};
|
||||
|
||||
apply_action({set_timer, T, Range}, CA) ->
|
||||
CA#mg_stateproc_ComplexAction{
|
||||
timer = {set_timer, #mg_stateproc_SetTimerAction{
|
||||
timer = marshal(timer, T),
|
||||
range = marshal(range, Range)
|
||||
}}
|
||||
};
|
||||
|
||||
apply_action({set_timer, T, Range, HandlingTimeout}, CA) ->
|
||||
CA#mg_stateproc_ComplexAction{
|
||||
timer = {set_timer, #mg_stateproc_SetTimerAction{
|
||||
timer = marshal(timer, T),
|
||||
range = marshal(range, Range),
|
||||
timeout = marshal(integer, HandlingTimeout)
|
||||
}}
|
||||
};
|
||||
|
||||
apply_action(unset_timer, CA) ->
|
||||
CA#mg_stateproc_ComplexAction{
|
||||
timer = {unset_timer, #mg_stateproc_UnsetTimerAction{}}
|
||||
|
@ -16,6 +16,7 @@
|
||||
-export([untag_success/1]).
|
||||
-export([conflict_untag_failure/1]).
|
||||
-export([reset_tag_success/1]).
|
||||
-export([tag_unset_timely/1]).
|
||||
|
||||
-import(ct_helper, [
|
||||
cfg/2,
|
||||
@ -39,7 +40,8 @@ all() ->
|
||||
single_tag_set_only ,
|
||||
untag_success ,
|
||||
conflict_untag_failure ,
|
||||
reset_tag_success
|
||||
reset_tag_success ,
|
||||
tag_unset_timely
|
||||
].
|
||||
|
||||
-spec init_per_suite(config()) -> config().
|
||||
@ -145,6 +147,17 @@ reset_tag_success(C) ->
|
||||
ok = machinery_machine_unique_tag_mg_example:tag(payproc, Tag, ID, Opts),
|
||||
{ok, ID} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts).
|
||||
|
||||
-spec tag_unset_timely(config()) -> test_return().
|
||||
|
||||
tag_unset_timely(C) ->
|
||||
Tag = genlib:unique(),
|
||||
ID = pid_to_binary(self()),
|
||||
Opts = #{woody_ctx => get_woody_ctx(C)},
|
||||
ok = machinery_machine_unique_tag_mg_example:tag_until(payproc, Tag, ID, {timeout, 1}, Opts),
|
||||
{ok, ID} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts),
|
||||
ok = timer:sleep(2 * 1000), % twice as much as needed
|
||||
{error, unset} = machinery_machine_unique_tag_mg_example:get(payproc, Tag, Opts).
|
||||
|
||||
%%
|
||||
|
||||
pid_to_binary(PID) ->
|
||||
|
Loading…
Reference in New Issue
Block a user