progressor/test/prg_base_SUITE.erl
ttt161 79f16f9cc3
epic/TD-927/progressor-prototype (#1)
* TD-934: add prototype implementation

* TD-934: fix

* TD-934: fix spec

* TD-934: fix spec

* TD-927: add context

* TD-927: fix json saving

* TD-927: fix

* TD-927: fix calls processing

* TD-927: fix call and repair processing, add remove action impl.

* TD-934: add kafka client

* XP-934: bump epg_connector

* TD-927: fix for table names

* XP-927: add common tests

* TD-927: reworks

* TD-927: cleanup

* TD-927: fix pg_backend

* TD-927: fix calls procesiing

* TD-927: add benchmark

* TD-927: bump epg_connector

* TD-927: fixes

* TD-927: fix timers saving

* TD-927: bump epg_connector

* XP-927: fix bench config

* TD-927: fix worker

* TD-927: cleanup

* TD-927: add logs

* TD-927: add logs

* TD-927: add logs

* TD-927: add metrics

* TD-927: fix scheduler pop task

* TD-927: rework pools

* TD-927: some optimizations

* TD-927: rework pg backend

* TD-927: update test config

* TD-927: bump epg_connector, cleanup

---------

Co-authored-by: ttt161 <losto@nix>
2024-10-09 11:24:02 +03:00

800 lines
26 KiB
Erlang

-module(prg_base_SUITE).
-include_lib("stdlib/include/assert.hrl").
%% API
-export([
init_per_suite/1,
end_per_suite/1,
all/0
]).
%% Tests
-export([simple_timers_test/1]).
-export([simple_call_test/1]).
-export([call_replace_timer_test/1]).
-export([call_unset_timer_test/1]).
-export([postponed_call_test/1]).
-export([postponed_call_to_suspended_process_test/1]).
-export([multiple_calls_test/1]).
-export([repair_after_non_retriable_error_test/1]).
-export([error_after_max_retries_test/1]).
-export([repair_after_call_error_test/1]).
-export([remove_by_timer_test/1]).
-export([remove_without_timer_test/1]).
-define(NS, 'default/default').
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
all() -> [
simple_timers_test,
simple_call_test,
call_replace_timer_test,
call_unset_timer_test,
postponed_call_test,
postponed_call_to_suspended_process_test,
multiple_calls_test,
repair_after_non_retriable_error_test,
error_after_max_retries_test,
repair_after_call_error_test,
remove_by_timer_test,
remove_without_timer_test
].
-spec simple_timers_test(_) -> _.
simple_timers_test(_C) ->
%% steps:
%% step aux_state events action
%% 1. init -> aux_state1, [event1], timer 2s
%% 2. timeout -> aux_state2, [event2], timer 0s
%% 3. timeout -> undefined, [], undefined
_ = mock_processor(simple_timers_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
3 = expect_steps_counter(3),
ExpectedAux = erlang:term_to_binary(<<"aux_state2">>),
{ok, #{
process_id := Id,
status := <<"running">>,
aux_state := ExpectedAux,
metadata := #{<<"k">> := <<"v">>},
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
unmock_processor(),
ok.
%%
-spec simple_call_test(_) -> _.
simple_call_test(_C) ->
%% steps:
%% 1. init -> [event1], timer 2s
%% 2. call -> [event2], undefined (duration 3s)
%% 3. timeout -> [event3], undefined
_ = mock_processor(simple_call_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
3 = expect_steps_counter(3),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
},
#{
event_id := 3,
metadata := #{<<"format_version">> := 1},
payload := _Pl3,
timestamp := _Ts3
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec call_replace_timer_test(_) -> _.
call_replace_timer_test(_C) ->
%% steps:
%% 1. init -> [event1], timer 2s + remove
%% 2. call -> [], timer 0s (new timer cancel remove)
%% 3. timeout -> [event2], undefined
_ = mock_processor(call_replace_timer_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
3 = expect_steps_counter(3),
%% wait task_scan_timeout, maybe remove works
timer:sleep(4000),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec call_unset_timer_test(_) -> _.
call_unset_timer_test(_C) ->
%% steps:
%% 1. init -> [event1], timer 2s
%% 2. call -> [], unset_timer
_ = mock_processor(call_unset_timer_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
%% wait 3 steps but got 2 - good!
2 = expect_steps_counter(3),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec postponed_call_test(_) -> _.
postponed_call_test(_C) ->
%% call between 0 sec timers
%% steps:
%% 1. init -> [], timer 0s
%% 2. timeout -> [event1], timer 0s (process duration 3000)
%% 3. call -> [event2], undefined
%% 4. timeout -> [event3], undefined
_ = mock_processor(postponed_call_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
4 = expect_steps_counter(4),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
},
#{
event_id := 3,
metadata := #{<<"format_version">> := 1},
payload := _Pl3,
timestamp := _Ts3
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec postponed_call_to_suspended_process_test(_) -> _.
postponed_call_to_suspended_process_test(_C) ->
%% call between 0 sec timers
%% steps:
%% 1. init -> [], timer 0s
%% 2. timeout -> [event1], undefined (process duration 3000)
%% 3. call -> [event2], undefined
_ = mock_processor(postponed_call_to_suspended_process_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, <<"response">>} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
3 = expect_steps_counter(3),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec multiple_calls_test(_) -> _.
multiple_calls_test(_C) ->
%% call between 0 sec timers
%% steps:
%% 1. init -> [], undefined
%% 2. call -> [event1], undefined
%% ...
%% 11. call -> [event10], undefined
_ = mock_processor(multiple_calls_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
lists:foreach(
fun(N) ->
spawn(progressor, call, [#{ns => ?NS, id => Id, args => <<N>>}])
end,
lists:seq(1, 10)
),
11 = expect_steps_counter(33),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{event_id := 1},
#{event_id := 2},
#{event_id := 3},
#{event_id := 4},
#{event_id := 5},
#{event_id := 6},
#{event_id := 7},
#{event_id := 8},
#{event_id := 9},
#{event_id := 10}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
-spec repair_after_non_retriable_error_test(_) -> _.
repair_after_non_retriable_error_test(_C) ->
%% steps:
%% 1. init -> [], timer 0s
%% 2. timeout -> {error, do_not_retry}
%% 3. repair -> [event1], undefined
%% 4. timeout -> [event2], undefined
_ = mock_processor(repair_after_non_retriable_error_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
2 = expect_steps_counter(2),
{ok, #{
detail := <<"do_not_retry">>,
history := [],
process_id := Id,
status := <<"error">>
}} = progressor:get(#{ns => ?NS, id => Id}),
{ok, ok} = progressor:repair(#{ns => ?NS, id => Id, args => <<"repair_args">>}),
4 = expect_steps_counter(4),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
}
]
} = Process} = progressor:get(#{ns => ?NS, id => Id}),
false = erlang:is_map_key(detail, Process),
ok.
%%
-spec error_after_max_retries_test(_) -> _.
error_after_max_retries_test(_C) ->
%% steps:
%% 1. init -> [], timer 0s
%% 2. timeout -> {error, retry_this}
%% 3. timeout -> {error, retry_this}
%% 4. timeout -> {error, retry_this}
_ = mock_processor(error_after_max_retries_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
4 = expect_steps_counter(4),
{ok, #{
detail := <<"retry_this">>,
history := [],
process_id := Id,
status := <<"error">>
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec repair_after_call_error_test(_) -> _.
repair_after_call_error_test(_C) ->
%% steps:
%% 1. init -> [], undefined
%% 2. call -> {error, retry_this}
%% 3. repair -> {error, repair_error}
%% 4. repair -> [event1], undefined
_ = mock_processor(repair_after_call_error_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{error, retry_this} = progressor:call(#{ns => ?NS, id => Id, args => <<"call_args">>}),
2 = expect_steps_counter(2),
{ok, #{
detail := <<"retry_this">>,
metadata := #{<<"k">> := <<"v">>},
history := [],
process_id := Id,
status := <<"error">>
}} = progressor:get(#{ns => ?NS, id => Id}),
{error, <<"repair_error">>} = progressor:repair(#{ns => ?NS, id => Id, args => <<"bad_repair_args">>}),
3 = expect_steps_counter(3),
%% shoul not rewrite detail
{ok, #{
detail := <<"retry_this">>,
metadata := #{<<"k">> := <<"v">>},
history := [],
process_id := Id,
status := <<"error">>
}} = progressor:get(#{ns => ?NS, id => Id}),
{ok, ok} = progressor:repair(#{ns => ?NS, id => Id, args => <<"repair_args">>}),
4 = expect_steps_counter(4),
{ok, #{
process_id := Id,
status := <<"running">>,
metadata := #{<<"k2">> := <<"v2">>},
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec remove_by_timer_test(_) -> _.
remove_by_timer_test(_C) ->
%% steps:
%% 1. init -> [event1, event2], timer 2s + remove
_ = mock_processor(remove_by_timer_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
},
#{
event_id := 2,
metadata := #{<<"format_version">> := 1},
payload := _Pl2,
timestamp := _Ts2
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
%% wait tsk_scan_timeout
timer:sleep(4000),
{error, <<"process not found">>} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%
-spec remove_without_timer_test(_) -> _.
remove_without_timer_test(_C) ->
%% steps:
%% 1. init -> [event1], timer 2s
%% 2. timeout -> [], remove
_ = mock_processor(remove_without_timer_test),
Id = gen_id(),
{ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}),
{ok, #{
process_id := Id,
status := <<"running">>,
history := [
#{
event_id := 1,
metadata := #{<<"format_version">> := 1},
payload := _Pl1,
timestamp := _Ts1
}
]
}} = progressor:get(#{ns => ?NS, id => Id}),
2 = expect_steps_counter(2),
{error, <<"process not found">>} = progressor:get(#{ns => ?NS, id => Id}),
ok.
%%%%%%%%%%%%%%%%%%%%%
%% Internal functions
%%%%%%%%%%%%%%%%%%%%%
mock_processor(simple_timers_test = TestCase) ->
Self = self(),
MockProcessor = fun({_Type, _Args, #{history := History} = _Process}, _Opts, _Ctx) ->
case erlang:length(History) of
0 ->
Result = #{
events => [event(1)],
metadata => #{<<"k">> => <<"v">>},
%% postponed timer
action => #{set_timer => erlang:system_time(second) + 2},
aux_state => erlang:term_to_binary(<<"aux_state1">>)
},
Self ! 1,
{ok, Result};
1 ->
Result = #{
events => [event(2)],
%% continuation timer
action => #{set_timer => erlang:system_time(second)},
aux_state => erlang:term_to_binary(<<"aux_state2">>)
},
Self ! 2,
{ok, Result};
_ ->
Result = #{
events => []
},
Self ! 3,
{ok, Result}
end
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(simple_call_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [event(1)],
action => #{set_timer => erlang:system_time(second) + 2}
},
Self ! 1,
{ok, Result};
({call, <<"call_args">>, _Process}, _Opts, _Ctx) ->
%% call when process suspended (wait timeout)
timer:sleep(3000),
Result = #{
response => <<"response">>,
events => [event(2)]
},
Self ! 2,
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
%% timeout after call processing
?assertEqual(2, erlang:length(History)),
Result = #{
events => [event(3)]
},
Self ! 3,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(call_replace_timer_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [event(1)],
action => #{set_timer => erlang:system_time(second) + 2, remove => true}
},
Self ! 1,
{ok, Result};
({call, <<"call_args">>, _Process}, _Opts, _Ctx) ->
%% call when process suspended (wait timeout)
Result = #{
response => <<"response">>,
events => [],
action => #{set_timer => erlang:system_time(second)}
},
Self ! 2,
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
%% timeout after call processing (remove action was cancelled by call action)
?assertEqual(1, erlang:length(History)),
Result = #{
events => [event(2)]
},
Self ! 3,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(call_unset_timer_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [event(1)],
action => #{set_timer => erlang:system_time(second) + 2}
},
Self ! 1,
{ok, Result};
({call, <<"call_args">>, _Process}, _Opts, _Ctx) ->
%% call when process suspended (wait timeout)
Result = #{
response => <<"response">>,
events => [],
action => unset_timer
},
Self ! 2,
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
%% timeout after call processing (should not work!)
?assertEqual(2, erlang:length(History)),
Result = #{
events => [event(3)]
},
Self ! 3,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(postponed_call_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [],
action => #{set_timer => erlang:system_time(second)}
},
Self ! 1,
{ok, Result};
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
timer:sleep(3000),
Result = #{
events => [event(1)],
action => #{set_timer => erlang:system_time(second)}
},
Self ! 2,
{ok, Result};
({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) ->
?assertEqual(1, erlang:length(History)),
Result = #{
response => <<"response">>,
events => [event(2)]
},
Self ! 3,
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
?assertEqual(2, erlang:length(History)),
Result = #{
events => [event(3)]
},
Self ! 4,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(postponed_call_to_suspended_process_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [],
action => #{set_timer => erlang:system_time(second)}
},
Self ! 1,
{ok, Result};
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
timer:sleep(3000),
Result = #{
events => [event(1)]
},
Self ! 2,
{ok, Result};
({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) ->
?assertEqual(1, erlang:length(History)),
Result = #{
response => <<"response">>,
events => [event(2)]
},
Self ! 3,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(multiple_calls_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => []
},
Self ! 1,
{ok, Result};
({call, <<N>>, _Process}, _Opts, _Ctx) ->
timer:sleep(100),
Result = #{
response => <<"response">>,
events => [event(N)]
},
Self ! iterate,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(repair_after_non_retriable_error_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [],
action => #{set_timer => erlang:system_time(second)}
},
Self ! 1,
{ok, Result};
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
Self ! 2,
{error, do_not_retry};
({repair, <<"repair_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
Result = #{
events => [event(1)]
},
Self ! 3,
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
?assertEqual(1, erlang:length(History)),
Result = #{
events => [event(2)]
},
Self ! 4,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(error_after_max_retries_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [],
action => #{set_timer => erlang:system_time(second)}
},
Self ! 1,
{ok, Result};
({timeout, <<>>, #{history := []} = _Process}, _Opts, _Ctx) ->
%% must be 3 attempts
Self ! iterate,
{error, retry_this}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(repair_after_call_error_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
metadata => #{<<"k">> => <<"v">>},
events => []
},
Self ! 1,
{ok, Result};
({call, <<"call_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
Self ! 2,
%% retriable error for call must be ignore and process set error status
{error, retry_this};
({repair, <<"bad_repair_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
%% repair error should not rewrite process detail
Self ! 3,
{error, <<"repair_error">>};
({repair, <<"repair_args">>, #{history := []} = _Process}, _Opts, _Ctx) ->
Result = #{
metadata => #{<<"k2">> => <<"v2">>},
events => [event(1)]
},
Self ! 4,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(remove_by_timer_test = TestCase) ->
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [event(1), event(2)],
action => #{set_timer => erlang:system_time(second) + 2, remove => true}
},
{ok, Result}
end,
mock_processor(TestCase, MockProcessor);
%%
mock_processor(remove_without_timer_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [event(1)],
action => #{set_timer => erlang:system_time(second) + 2}
},
Self ! 1,
{ok, Result};
({timeout, <<>>, _Process}, _Opts, _Ctx) ->
Result = #{
events => [],
action => #{remove => true}
},
Self ! 2,
{ok, Result}
end,
mock_processor(TestCase, MockProcessor).
mock_processor(_TestCase, MockFun) ->
meck:new(prg_ct_processor),
meck:expect(prg_ct_processor, process, MockFun).
unmock_processor() ->
meck:unload(prg_ct_processor).
expect_steps_counter(ExpectedSteps) ->
expect_steps_counter(ExpectedSteps, 0).
expect_steps_counter(ExpectedSteps, CurrentStep) ->
receive
iterate when CurrentStep + 1 =:= ExpectedSteps ->
%% wait storage
timer:sleep(50),
ExpectedSteps;
iterate ->
expect_steps_counter(ExpectedSteps, CurrentStep + 1);
Counter when Counter =:= ExpectedSteps ->
%% wait storage
timer:sleep(50),
Counter;
Counter ->
expect_steps_counter(ExpectedSteps, Counter)
after 5000 ->
%% after process_step_timeout/2
CurrentStep
end.
event(Id) ->
#{
event_id => Id,
timestamp => erlang:system_time(second),
metadata => #{<<"format_version">> => 1},
payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)}) %% msg_pack compatibility for kafka
}.
gen_id() ->
base64:encode(crypto:strong_rand_bytes(8)).