mirror of
https://github.com/valitydev/progressor.git
synced 2024-11-06 00:15:21 +00:00
79f16f9cc3
* TD-934: add prototype implementation * TD-934: fix * TD-934: fix spec * TD-934: fix spec * TD-927: add context * TD-927: fix json saving * TD-927: fix * TD-927: fix calls processing * TD-927: fix call and repair processing, add remove action impl. * TD-934: add kafka client * XP-934: bump epg_connector * TD-927: fix for table names * XP-927: add common tests * TD-927: reworks * TD-927: cleanup * TD-927: fix pg_backend * TD-927: fix calls procesiing * TD-927: add benchmark * TD-927: bump epg_connector * TD-927: fixes * TD-927: fix timers saving * TD-927: bump epg_connector * XP-927: fix bench config * TD-927: fix worker * TD-927: cleanup * TD-927: add logs * TD-927: add logs * TD-927: add logs * TD-927: add metrics * TD-927: fix scheduler pop task * TD-927: rework pools * TD-927: some optimizations * TD-927: rework pg backend * TD-927: update test config * TD-927: bump epg_connector, cleanup --------- Co-authored-by: ttt161 <losto@nix>
800 lines
26 KiB
Erlang
800 lines
26 KiB
Erlang
-module(prg_base_SUITE).
|
|
|
|
-include_lib("stdlib/include/assert.hrl").
|
|
|
|
%% API
|
|
-export([
|
|
init_per_suite/1,
|
|
end_per_suite/1,
|
|
all/0
|
|
]).
|
|
|
|
%% Tests
|
|
-export([simple_timers_test/1]).
|
|
-export([simple_call_test/1]).
|
|
-export([call_replace_timer_test/1]).
|
|
-export([call_unset_timer_test/1]).
|
|
-export([postponed_call_test/1]).
|
|
-export([postponed_call_to_suspended_process_test/1]).
|
|
-export([multiple_calls_test/1]).
|
|
-export([repair_after_non_retriable_error_test/1]).
|
|
-export([error_after_max_retries_test/1]).
|
|
-export([repair_after_call_error_test/1]).
|
|
-export([remove_by_timer_test/1]).
|
|
-export([remove_without_timer_test/1]).
|
|
|
|
-define(NS, 'default/default').
|
|
|
|
init_per_suite(Config) ->
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok.
|
|
|
|
all() -> [
|
|
simple_timers_test,
|
|
simple_call_test,
|
|
call_replace_timer_test,
|
|
call_unset_timer_test,
|
|
postponed_call_test,
|
|
postponed_call_to_suspended_process_test,
|
|
multiple_calls_test,
|
|
repair_after_non_retriable_error_test,
|
|
error_after_max_retries_test,
|
|
repair_after_call_error_test,
|
|
remove_by_timer_test,
|
|
remove_without_timer_test
|
|
].
|
|
|
|
-spec simple_timers_test(_) -> _.
|
|
simple_timers_test(_C) ->
|
|
%% steps:
|
|
%% step aux_state events action
|
|
%% 1. init -> aux_state1, [event1], timer 2s
|
|
%% 2. timeout -> aux_state2, [event2], timer 0s
|
|
%% 3. timeout -> undefined, [], undefined
|
|
_ = mock_processor(simple_timers_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
3 = expect_steps_counter(3),
|
|
ExpectedAux = erlang:term_to_binary(<<"aux_state2">>),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
aux_state := ExpectedAux,
|
|
metadata := #{<<"k">> := <<"v">>},
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
unmock_processor(),
|
|
ok.
|
|
%%
|
|
-spec simple_call_test(_) -> _.
|
|
simple_call_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [event1], timer 2s
|
|
%% 2. call -> [event2], undefined (duration 3s)
|
|
%% 3. timeout -> [event3], undefined
|
|
_ = mock_processor(simple_call_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
|
|
3 = expect_steps_counter(3),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
},
|
|
#{
|
|
event_id := 3,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl3,
|
|
timestamp := _Ts3
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec call_replace_timer_test(_) -> _.
|
|
call_replace_timer_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [event1], timer 2s + remove
|
|
%% 2. call -> [], timer 0s (new timer cancel remove)
|
|
%% 3. timeout -> [event2], undefined
|
|
_ = mock_processor(call_replace_timer_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
|
|
3 = expect_steps_counter(3),
|
|
%% wait task_scan_timeout, maybe remove works
|
|
timer:sleep(4000),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec call_unset_timer_test(_) -> _.
|
|
call_unset_timer_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [event1], timer 2s
|
|
%% 2. call -> [], unset_timer
|
|
_ = mock_processor(call_unset_timer_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
|
|
%% wait 3 steps but got 2 - good!
|
|
2 = expect_steps_counter(3),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec postponed_call_test(_) -> _.
|
|
postponed_call_test(_C) ->
|
|
%% call between 0 sec timers
|
|
%% steps:
|
|
%% 1. init -> [], timer 0s
|
|
%% 2. timeout -> [event1], timer 0s (process duration 3000)
|
|
%% 3. call -> [event2], undefined
|
|
%% 4. timeout -> [event3], undefined
|
|
_ = mock_processor(postponed_call_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
|
|
4 = expect_steps_counter(4),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
},
|
|
#{
|
|
event_id := 3,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl3,
|
|
timestamp := _Ts3
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec postponed_call_to_suspended_process_test(_) -> _.
|
|
postponed_call_to_suspended_process_test(_C) ->
|
|
%% call between 0 sec timers
|
|
%% steps:
|
|
%% 1. init -> [], timer 0s
|
|
%% 2. timeout -> [event1], undefined (process duration 3000)
|
|
%% 3. call -> [event2], undefined
|
|
_ = mock_processor(postponed_call_to_suspended_process_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
|
|
3 = expect_steps_counter(3),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec multiple_calls_test(_) -> _.
|
|
multiple_calls_test(_C) ->
|
|
%% call between 0 sec timers
|
|
%% steps:
|
|
%% 1. init -> [], undefined
|
|
%% 2. call -> [event1], undefined
|
|
%% ...
|
|
%% 11. call -> [event10], undefined
|
|
_ = mock_processor(multiple_calls_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
lists:foreach(
|
|
fun(N) ->
|
|
spawn(progressor, call, [#{ns => ?NS, id => Id, args => <<N>>}])
|
|
end,
|
|
lists:seq(1, 10)
|
|
),
|
|
11 = expect_steps_counter(33),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{event_id := 1},
|
|
#{event_id := 2},
|
|
#{event_id := 3},
|
|
#{event_id := 4},
|
|
#{event_id := 5},
|
|
#{event_id := 6},
|
|
#{event_id := 7},
|
|
#{event_id := 8},
|
|
#{event_id := 9},
|
|
#{event_id := 10}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
|
|
-spec repair_after_non_retriable_error_test(_) -> _.
|
|
repair_after_non_retriable_error_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [], timer 0s
|
|
%% 2. timeout -> {error, do_not_retry}
|
|
%% 3. repair -> [event1], undefined
|
|
%% 4. timeout -> [event2], undefined
|
|
_ = mock_processor(repair_after_non_retriable_error_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
2 = expect_steps_counter(2),
|
|
{ok, #{
|
|
detail := <<"do_not_retry">>,
|
|
history := [],
|
|
process_id := Id,
|
|
status := <<"error">>
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
{ok, ok} = progressor:repair(#{ns => ?NS, id => Id, args => <<"repair_args">>}),
|
|
4 = expect_steps_counter(4),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
}
|
|
]
|
|
} = Process} = progressor:get(#{ns => ?NS, id => Id}),
|
|
false = erlang:is_map_key(detail, Process),
|
|
ok.
|
|
%%
|
|
-spec error_after_max_retries_test(_) -> _.
|
|
error_after_max_retries_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [], timer 0s
|
|
%% 2. timeout -> {error, retry_this}
|
|
%% 3. timeout -> {error, retry_this}
|
|
%% 4. timeout -> {error, retry_this}
|
|
_ = mock_processor(error_after_max_retries_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
4 = expect_steps_counter(4),
|
|
{ok, #{
|
|
detail := <<"retry_this">>,
|
|
history := [],
|
|
process_id := Id,
|
|
status := <<"error">>
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec repair_after_call_error_test(_) -> _.
|
|
repair_after_call_error_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [], undefined
|
|
%% 2. call -> {error, retry_this}
|
|
%% 3. repair -> {error, repair_error}
|
|
%% 4. repair -> [event1], undefined
|
|
_ = mock_processor(repair_after_call_error_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{error, retry_this} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
|
|
2 = expect_steps_counter(2),
|
|
{ok, #{
|
|
detail := <<"retry_this">>,
|
|
metadata := #{<<"k">> := <<"v">>},
|
|
history := [],
|
|
process_id := Id,
|
|
status := <<"error">>
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
{error, <<"repair_error">>} = progressor:repair(#{ns => ?NS, id => Id, args => <<"bad_repair_args">>}),
|
|
3 = expect_steps_counter(3),
|
|
%% shoul not rewrite detail
|
|
{ok, #{
|
|
detail := <<"retry_this">>,
|
|
metadata := #{<<"k">> := <<"v">>},
|
|
history := [],
|
|
process_id := Id,
|
|
status := <<"error">>
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
{ok, ok} = progressor:repair(#{ns => ?NS, id => Id, args => <<"repair_args">>}),
|
|
4 = expect_steps_counter(4),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
metadata := #{<<"k2">> := <<"v2">>},
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec remove_by_timer_test(_) -> _.
|
|
remove_by_timer_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [event1, event2], timer 2s + remove
|
|
_ = mock_processor(remove_by_timer_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
},
|
|
#{
|
|
event_id := 2,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl2,
|
|
timestamp := _Ts2
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
%% wait tsk_scan_timeout
|
|
timer:sleep(4000),
|
|
{error, <<"process not found">>} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
%%
|
|
-spec remove_without_timer_test(_) -> _.
|
|
remove_without_timer_test(_C) ->
|
|
%% steps:
|
|
%% 1. init -> [event1], timer 2s
|
|
%% 2. timeout -> [], remove
|
|
_ = mock_processor(remove_without_timer_test),
|
|
Id = gen_id(),
|
|
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
|
|
{ok, #{
|
|
process_id := Id,
|
|
status := <<"running">>,
|
|
history := [
|
|
#{
|
|
event_id := 1,
|
|
metadata := #{<<"format_version">> := 1},
|
|
payload := _Pl1,
|
|
timestamp := _Ts1
|
|
}
|
|
]
|
|
}} = progressor:get(#{ns => ?NS, id => Id}),
|
|
2 = expect_steps_counter(2),
|
|
{error, <<"process not found">>} = progressor:get(#{ns => ?NS, id => Id}),
|
|
ok.
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%
|
|
%% Internal functions
|
|
%%%%%%%%%%%%%%%%%%%%%
|
|
|
|
mock_processor(simple_timers_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun({_Type, _Args, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
case erlang:length(History) of
|
|
0 ->
|
|
Result = #{
|
|
events => [event(1)],
|
|
metadata => #{<<"k">> => <<"v">>},
|
|
%% postponed timer
|
|
action => #{set_timer => erlang:system_time(second) + 2},
|
|
aux_state => erlang:term_to_binary(<<"aux_state1">>)
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
1 ->
|
|
Result = #{
|
|
events => [event(2)],
|
|
%% continuation timer
|
|
action => #{set_timer => erlang:system_time(second)},
|
|
aux_state => erlang:term_to_binary(<<"aux_state2">>)
|
|
},
|
|
Self ! 2,
|
|
{ok, Result};
|
|
_ ->
|
|
Result = #{
|
|
events => []
|
|
},
|
|
Self ! 3,
|
|
{ok, Result}
|
|
end
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(simple_call_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [event(1)],
|
|
action => #{set_timer => erlang:system_time(second) + 2}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({call, <<"call_args">>, _Process}, _Opts, _Ctx) ->
|
|
%% call when process suspended (wait timeout)
|
|
timer:sleep(3000),
|
|
Result = #{
|
|
response => <<"response">>,
|
|
events => [event(2)]
|
|
},
|
|
Self ! 2,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
%% timeout after call processing
|
|
?assertEqual(2, erlang:length(History)),
|
|
Result = #{
|
|
events => [event(3)]
|
|
},
|
|
Self ! 3,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(call_replace_timer_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [event(1)],
|
|
action => #{set_timer => erlang:system_time(second) + 2, remove => true}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({call, <<"call_args">>, _Process}, _Opts, _Ctx) ->
|
|
%% call when process suspended (wait timeout)
|
|
Result = #{
|
|
response => <<"response">>,
|
|
events => [],
|
|
action => #{set_timer => erlang:system_time(second)}
|
|
},
|
|
Self ! 2,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
%% timeout after call processing (remove action was cancelled by call action)
|
|
?assertEqual(1, erlang:length(History)),
|
|
Result = #{
|
|
events => [event(2)]
|
|
},
|
|
Self ! 3,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(call_unset_timer_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [event(1)],
|
|
action => #{set_timer => erlang:system_time(second) + 2}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({call, <<"call_args">>, _Process}, _Opts, _Ctx) ->
|
|
%% call when process suspended (wait timeout)
|
|
Result = #{
|
|
response => <<"response">>,
|
|
events => [],
|
|
action => unset_timer
|
|
},
|
|
Self ! 2,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
%% timeout after call processing (should not work!)
|
|
?assertEqual(2, erlang:length(History)),
|
|
Result = #{
|
|
events => [event(3)]
|
|
},
|
|
Self ! 3,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(postponed_call_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [],
|
|
action => #{set_timer => erlang:system_time(second)}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
timer:sleep(3000),
|
|
Result = #{
|
|
events => [event(1)],
|
|
action => #{set_timer => erlang:system_time(second)}
|
|
},
|
|
Self ! 2,
|
|
{ok, Result};
|
|
({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
?assertEqual(1, erlang:length(History)),
|
|
Result = #{
|
|
response => <<"response">>,
|
|
events => [event(2)]
|
|
},
|
|
Self ! 3,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
?assertEqual(2, erlang:length(History)),
|
|
Result = #{
|
|
events => [event(3)]
|
|
},
|
|
Self ! 4,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(postponed_call_to_suspended_process_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [],
|
|
action => #{set_timer => erlang:system_time(second)}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
timer:sleep(3000),
|
|
Result = #{
|
|
events => [event(1)]
|
|
},
|
|
Self ! 2,
|
|
{ok, Result};
|
|
({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
?assertEqual(1, erlang:length(History)),
|
|
Result = #{
|
|
response => <<"response">>,
|
|
events => [event(2)]
|
|
},
|
|
Self ! 3,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(multiple_calls_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => []
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({call, <<N>>, _Process}, _Opts, _Ctx) ->
|
|
timer:sleep(100),
|
|
Result = #{
|
|
response => <<"response">>,
|
|
events => [event(N)]
|
|
},
|
|
Self ! iterate,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(repair_after_non_retriable_error_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [],
|
|
action => #{set_timer => erlang:system_time(second)}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
Self ! 2,
|
|
{error, do_not_retry};
|
|
({repair, <<"repair_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [event(1)]
|
|
},
|
|
Self ! 3,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
|
|
?assertEqual(1, erlang:length(History)),
|
|
Result = #{
|
|
events => [event(2)]
|
|
},
|
|
Self ! 4,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(error_after_max_retries_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [],
|
|
action => #{set_timer => erlang:system_time(second)}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
%% must be 3 attempts
|
|
Self ! iterate,
|
|
{error, retry_this}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(repair_after_call_error_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
metadata => #{<<"k">> => <<"v">>},
|
|
events => []
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({call, <<"call_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
Self ! 2,
|
|
%% retriable error for call must be ignore and process set error status
|
|
{error, retry_this};
|
|
({repair, <<"bad_repair_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
%% repair error should not rewrite process detail
|
|
Self ! 3,
|
|
{error, <<"repair_error">>};
|
|
({repair, <<"repair_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
metadata => #{<<"k2">> => <<"v2">>},
|
|
events => [event(1)]
|
|
},
|
|
Self ! 4,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(remove_by_timer_test = TestCase) ->
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [event(1), event(2)],
|
|
action => #{set_timer => erlang:system_time(second) + 2, remove => true}
|
|
},
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor);
|
|
%%
|
|
mock_processor(remove_without_timer_test = TestCase) ->
|
|
Self = self(),
|
|
MockProcessor = fun
|
|
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [event(1)],
|
|
action => #{set_timer => erlang:system_time(second) + 2}
|
|
},
|
|
Self ! 1,
|
|
{ok, Result};
|
|
({timeout, <<>>, _Process}, _Opts, _Ctx) ->
|
|
Result = #{
|
|
events => [],
|
|
action => #{remove => true}
|
|
},
|
|
Self ! 2,
|
|
{ok, Result}
|
|
end,
|
|
mock_processor(TestCase, MockProcessor).
|
|
|
|
mock_processor(_TestCase, MockFun) ->
|
|
meck:new(prg_ct_processor),
|
|
meck:expect(prg_ct_processor, process, MockFun).
|
|
|
|
unmock_processor() ->
|
|
meck:unload(prg_ct_processor).
|
|
|
|
expect_steps_counter(ExpectedSteps) ->
|
|
expect_steps_counter(ExpectedSteps, 0).
|
|
|
|
expect_steps_counter(ExpectedSteps, CurrentStep) ->
|
|
receive
|
|
iterate when CurrentStep + 1 =:= ExpectedSteps ->
|
|
%% wait storage
|
|
timer:sleep(50),
|
|
ExpectedSteps;
|
|
iterate ->
|
|
expect_steps_counter(ExpectedSteps, CurrentStep + 1);
|
|
Counter when Counter =:= ExpectedSteps ->
|
|
%% wait storage
|
|
timer:sleep(50),
|
|
Counter;
|
|
Counter ->
|
|
expect_steps_counter(ExpectedSteps, Counter)
|
|
after 5000 ->
|
|
%% after process_step_timeout/2
|
|
CurrentStep
|
|
end.
|
|
|
|
event(Id) ->
|
|
#{
|
|
event_id => Id,
|
|
timestamp => erlang:system_time(second),
|
|
metadata => #{<<"format_version">> => 1},
|
|
payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)}) %% msg_pack compatibility for kafka
|
|
}.
|
|
|
|
gen_id() ->
|
|
base64:encode(crypto:strong_rand_bytes(8)).
|