Changes to loaded_upgrade test to improve reliability

* Wait for services after upgrading a node
* Remove hardcoded sleep call
* Reduce time between upgrades from 300 to 120 seconds
* Refactor worker functions to accept a report pid parameter instead
  of assuming a loaded_upgrade named process.
* Remove the mapreduce load. The results of the mapreduce queries
  during node shutdown are not predictable enough at this time to rely
  on for this test.
This commit is contained in:
Kelly McLaughlin 2014-01-07 15:56:20 -07:00
parent e880a6785e
commit dc0eb72b65
3 changed files with 132 additions and 136 deletions

View File

@ -23,11 +23,11 @@
-behavior(supervisor).
%% Helper macro for declaring children of supervisor
-define(CHILD(Id, Mod, Node, Backend, Vsn), {
list_to_atom(atom_to_list(Node) ++ "_loader_" ++ integer_to_list(Id)),
{ Mod,
start_link,
[list_to_atom(atom_to_list(Node) ++ "_loader_" ++ integer_to_list(Id)), Node, Backend, Vsn]},
-define(CHILD(Id, Mod, Node, Backend, Vsn, ReportPid), {
list_to_atom(atom_to_list(Node) ++ "_loader_" ++ integer_to_list(Id)),
{ Mod,
start_link,
[list_to_atom(atom_to_list(Node) ++ "_loader_" ++ integer_to_list(Id)), Node, Backend, Vsn, ReportPid]},
permanent, 5000, worker, [Mod]}).
-export([init/1]).
@ -41,9 +41,10 @@ init(Props) ->
Node = proplists:get_value(node, Props),
Backend = proplists:get_value(backend, Props),
Vsn = proplists:get_value(version, Props),
ReportPid = proplists:get_value(report_pid, Props),
ChildSpecs = [
?CHILD(Num, loaded_upgrade_worker_sup, Node, Backend, Vsn)
?CHILD(Num, loaded_upgrade_worker_sup, Node, Backend, Vsn, ReportPid)
|| Num <- lists:seq(1, WorkersPerNode)],
lager:info("Starting ~p workers to ~p", [WorkersPerNode, Node]),

View File

@ -25,14 +25,14 @@
-export([kv_valgen/1, bucket/1, erlang_mr/0, int_to_key/1]).
-define(TIME_BETWEEN_UPGRADES, 300). %% Seconds!
-define(TIME_BETWEEN_UPGRADES, 120). %% Seconds!
confirm() ->
case whereis(loaded_upgrade) of
undefined -> meh;
_ -> unregister(loaded_upgrade)
end,
end,
register(loaded_upgrade, self()),
%% Build Cluster
TestMetaData = riak_test_runner:metadata(),
@ -46,50 +46,43 @@ confirm() ->
Nodes = rt:build_cluster(Vsns),
seed_cluster(Nodes),
%% Now we have a cluster!
%% Let's spawn workers against it.
timer:sleep(10000),
Concurrent = rt_config:get(load_workers, 10),
Sups = [
{rt_worker_sup:start_link([
{concurrent, Concurrent},
{node, Node},
{backend, Backend},
{version, OldVsn}
]), Node}
|| Node <- Nodes],
Sups = [{rt_worker_sup:start_link([{concurrent, Concurrent},
{node, Node},
{backend, Backend},
{version, OldVsn},
{report_pid, self()}]), Node} || Node <- Nodes],
upgrade_recv_loop(),
[begin
exit(Sup, normal),
lager:info("Upgrading ~p", [Node]),
rt:upgrade(Node, current),
{ok, NewSup} = rt_worker_sup:start_link([
{concurrent, Concurrent},
{node, Node},
{backend, Backend},
{version, current}
]),
_NodeMon = init_node_monitor(Node, NewSup, self()),
upgrade_recv_loop()
end || {{ok, Sup}, Node} <- Sups],
exit(Sup, normal),
lager:info("Upgrading ~p", [Node]),
rt:upgrade(Node, current),
rt:wait_for_service(Node, [riak_search,riak_kv,riak_pipe]),
{ok, NewSup} = rt_worker_sup:start_link([{concurrent, Concurrent},
{node, Node},
{backend, Backend},
{version, current},
{report_pid, self()}]),
_NodeMon = init_node_monitor(Node, NewSup, self()),
upgrade_recv_loop()
end || {{ok, Sup}, Node} <- Sups],
pass.
upgrade_recv_loop() ->
{SMega, SSec, SMicro} = os:timestamp(),
EndSecs = SSec + ?TIME_BETWEEN_UPGRADES,
EndTime = case EndSecs > 1000000 of
true ->
{SMega + 1, EndSecs - 1000000, SMicro};
_ ->
{SMega, EndSecs, SMicro}
end,
true ->
{SMega + 1, EndSecs - 1000000, SMicro};
_ ->
{SMega, EndSecs, SMicro}
end,
upgrade_recv_loop(EndTime).
%% TODO: Collect error message counts in ets table
@ -99,23 +92,23 @@ upgrade_recv_loop(EndTime) ->
true ->
lager:info("Done waiting 'cause ~p > ~p", [Now, EndTime]);
_ ->
receive
{mapred, Node, bad_result} ->
?assertEqual(true, {mapred, Node, bad_result});
{kv, Node, not_equal} ->
?assertEqual(true, {kv, Node, bad_result});
{kv, Node, {notfound, Key}} ->
?assertEqual(true, {kv, Node, {notfound, Key}});
{listkeys, Node, not_equal} ->
?assertEqual(true, {listkeys, Node, not_equal});
{search, Node, bad_result} ->
?assertEqual(true, {search, Node, bad_result});
Msg ->
lager:debug("Received Mesg ~p", [Msg]),
upgrade_recv_loop(EndTime)
after timer:now_diff(EndTime, Now) div 1000 ->
lager:info("Done waiting 'cause ~p is up", [?TIME_BETWEEN_UPGRADES])
end
receive
{mapred, Node, bad_result} ->
?assertEqual(true, {mapred, Node, bad_result});
{kv, Node, not_equal} ->
?assertEqual(true, {kv, Node, bad_result});
{kv, Node, {notfound, Key}} ->
?assertEqual(true, {kv, Node, {notfound, Key}});
{listkeys, Node, not_equal} ->
?assertEqual(true, {listkeys, Node, not_equal});
{search, Node, bad_result} ->
?assertEqual(true, {search, Node, bad_result});
Msg ->
lager:debug("Received Mesg ~p", [Msg]),
upgrade_recv_loop(EndTime)
after timer:now_diff(EndTime, Now) div 1000 ->
lager:info("Done waiting 'cause ~p is up", [?TIME_BETWEEN_UPGRADES])
end
end.
seed_cluster(Nodes=[Node1|_]) ->
@ -127,9 +120,9 @@ seed_cluster(Nodes=[Node1|_]) ->
?assertEqual([], rt:systest_read(Node1, 100, 1)),
seed(Node1, 0, 100, fun(Key) ->
Bin = iolist_to_binary(io_lib:format("~p", [Key])),
riakc_obj:new(<<"objects">>, Bin, Bin)
end),
Bin = iolist_to_binary(io_lib:format("~p", [Key])),
riakc_obj:new(<<"objects">>, Bin, Bin)
end),
%% For KV
kv_seed(Node1),
@ -155,9 +148,9 @@ seed_search(Node) ->
Pid = rt:pbc(Node),
SpamDir = rt_config:get(spam_dir),
Files = case SpamDir of
undefined -> undefined;
_ -> filelib:wildcard(SpamDir ++ "/*")
end,
undefined -> undefined;
_ -> filelib:wildcard(SpamDir ++ "/*")
end,
seed_search(Pid, Files),
riakc_pb_socket:stop(Pid).
@ -169,8 +162,8 @@ seed_search(Pid, [File|Files]) ->
kv_seed(Node) ->
ValFun = fun(Key) ->
riakc_obj:new(bucket(kv), iolist_to_binary(io_lib:format("~p", [Key])), kv_valgen(Key))
end,
riakc_obj:new(bucket(kv), iolist_to_binary(io_lib:format("~p", [Key])), kv_valgen(Key))
end,
seed(Node, 0, 7999, ValFun).
kv_valgen(Key) ->
@ -184,36 +177,36 @@ int_to_key(KInt) ->
%% bin_plustwo -> [<<"Key + 2">>]
twoi_seed(Node) ->
ValFun = fun(Key) ->
Obj = riakc_obj:new(bucket(twoi), iolist_to_binary(io_lib:format("~p", [Key])), kv_valgen(Key)),
MD1 = riakc_obj:get_update_metadata(Obj),
MD2 = riakc_obj:set_secondary_index(MD1, [
{{integer_index, "plusone"}, [Key + 1, Key + 10000]},
{{binary_index, "plustwo"}, [int_to_key(Key + 2)]}
]),
riakc_obj:update_metadata(Obj, MD2)
end,
Obj = riakc_obj:new(bucket(twoi), iolist_to_binary(io_lib:format("~p", [Key])), kv_valgen(Key)),
MD1 = riakc_obj:get_update_metadata(Obj),
MD2 = riakc_obj:set_secondary_index(MD1, [
{{integer_index, "plusone"}, [Key + 1, Key + 10000]},
{{binary_index, "plustwo"}, [int_to_key(Key + 2)]}
]),
riakc_obj:update_metadata(Obj, MD2)
end,
seed(Node, 0, 7999, ValFun).
erlang_mr() ->
[{map, {modfun, riak_kv_mapreduce, map_object_value}, none, false},
{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, none, true}].
{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, none, true}].
mr_seed(Node) ->
%% to be used along with sequential_int keygen to populate known
%% mapreduce set
%% to be used along with sequential_int keygen to populate known
%% mapreduce set
ValFun = fun(Key) ->
Value = iolist_to_binary(io_lib:format("~p", [Key])),
riakc_obj:new(bucket(mapred), Value, Value)
end,
Value = iolist_to_binary(io_lib:format("~p", [Key])),
riakc_obj:new(bucket(mapred), Value, Value)
end,
seed(Node, 0, 9999, ValFun).
seed(Node, Start, End, ValFun) ->
PBC = rt:pbc(Node),
[ begin
Obj = ValFun(Key),
riakc_pb_socket:put(PBC, Obj, [{w,3}])
end || Key <- lists:seq(Start, End)],
Obj = ValFun(Key),
riakc_pb_socket:put(PBC, Obj, [{w,3}])
end || Key <- lists:seq(Start, End)],
riakc_pb_socket:stop(PBC).

View File

@ -27,33 +27,33 @@
%% API
-export([assert_equal/2]).
-export([list_keys_tester/4, kv_tester/4, mapred_tester/4,
twoi_tester/4, search_tester/4, tester_start_link/3]).
-export([list_keys_tester/5, kv_tester/5, mapred_tester/5,
twoi_tester/5, search_tester/5, tester_start_link/4]).
-export([init/1]).
-export([start_link/4]).
-export([start_link/5]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Name, FunName, Node, Vsn), {
-define(CHILD(Name, FunName, Node, Vsn, ReportPid), {
list_to_atom(atom_to_list(Name) ++ "_" ++ atom_to_list(FunName)),
{ ?MODULE,
tester_start_link,
[FunName, Node, Vsn]},
{ ?MODULE,
tester_start_link,
[FunName, Node, Vsn, ReportPid]},
permanent, 5000, worker, [?MODULE]}).
start_link(Name, Node, Backend, Vsn) ->
supervisor:start_link(?MODULE, [Name, Node, Backend, Vsn]).
start_link(Name, Node, Backend, Vsn, ReportPid) ->
supervisor:start_link(?MODULE, [Name, Node, Backend, Vsn, ReportPid]).
init([Name, Node, Backend, Vsn]) ->
init([Name, Node, Backend, Vsn, ReportPid]) ->
rt:wait_for_service(Node, [riak_search,riak_kv,riak_pipe]),
ChildSpecs1 = [
?CHILD(Name, FunName, Node, Vsn)
|| FunName <- [list_keys_tester, mapred_tester, kv_tester, search_tester]],
ChildSpecs1 = [
?CHILD(Name, FunName, Node, Vsn, ReportPid)
|| FunName <- [list_keys_tester, kv_tester, search_tester]],
ChildSpecs = case Backend of
eleveldb ->
[?CHILD(Name, twoi_tester, Node, Vsn) | ChildSpecs1];
[?CHILD(Name, twoi_tester, Node, Vsn, ReportPid) | ChildSpecs1];
_ -> ChildSpecs1
end,
{ok, {{one_for_one, 1000, 60}, ChildSpecs}}.
@ -63,10 +63,10 @@ init([Name, Node, Backend, Vsn]) ->
%%% Internal functions
%%%===================================================================
tester_start_link(Function, Node, Vsn) ->
{ok, spawn_link(?MODULE, Function, [Node, 0, undefined, Vsn])}.
tester_start_link(Function, Node, Vsn, ReportPid) ->
{ok, spawn_link(?MODULE, Function, [Node, 0, undefined, Vsn, ReportPid])}.
list_keys_tester(Node, Count, Pid, Vsn) ->
list_keys_tester(Node, Count, Pid, Vsn, ReportPid) ->
PBC = pb_pid_recycler(Pid, Node),
case riakc_pb_socket:list_keys(PBC, <<"objects">>) of
{ok, Keys} ->
@ -74,42 +74,43 @@ list_keys_tester(Node, Count, Pid, Vsn) ->
ExpectedKeys = lists:usort([loaded_upgrade:int_to_key(K) || K <- lists:seq(0, 100)]),
case assert_equal(ExpectedKeys, ActualKeys) of
true -> cool;
_ -> loaded_upgrade ! {listkeys, Node, not_equal}
_ -> ReportPid ! {listkeys, Node, not_equal}
end;
{error, timeout} ->
loaded_upgrade ! {listkeys, Node, timeout};
ReportPid ! {listkeys, Node, timeout};
{error, {timeout, _}} ->
loaded_upgrade ! {listkeys, Node, timeout};
ReportPid ! {listkeys, Node, timeout};
Unexpected ->
loaded_upgrade ! {listkeys, Node, Unexpected}
ReportPid ! {listkeys, Node, Unexpected}
end,
list_keys_tester(Node, Count + 1, PBC, Vsn).
list_keys_tester(Node, Count + 1, PBC, Vsn, ReportPid).
kv_tester(Node, Count, Pid, Vsn) ->
kv_tester(Node, Count, Pid, Vsn, ReportPid) ->
PBC = pb_pid_recycler(Pid, Node),
Key = Count rem 8000,
case riakc_pb_socket:get(PBC, loaded_upgrade:bucket(kv), loaded_upgrade:int_to_key(Key)) of
{ok, Val} ->
case loaded_upgrade:kv_valgen(Key) == riakc_obj:get_value(Val) of
true -> cool;
_ -> loaded_upgrade ! {kv, Node, not_equal}
_ -> ReportPid ! {kv, Node, not_equal}
end;
{error, disconnected} ->
ok;
{error, notfound} ->
loaded_upgrade ! {kv, Node, {notfound, Key}};
ReportPid ! {kv, Node, {notfound, Key}};
Unexpected ->
loaded_upgrade ! {kv, Node, Unexpected}
ReportPid ! {kv, Node, Unexpected}
end,
kv_tester(Node, Count + 1, PBC, Vsn).
kv_tester(Node, Count + 1, PBC, Vsn, ReportPid).
mapred_tester(Node, Count, Pid, Vsn) ->
mapred_tester(Node, Count, Pid, Vsn, ReportPid) ->
PBC = pb_pid_recycler(Pid, Node),
case riakc_pb_socket:mapred(PBC, loaded_upgrade:bucket(mapred), loaded_upgrade:erlang_mr()) of
{ok, [{1, [10000]}]} ->
ok;
{ok, _R} ->
loaded_upgrade ! {mapred, Node, bad_result};
{ok, R} ->
lager:warning("Bad MR result: ~p", [R]),
ReportPid ! {mapred, Node, bad_result};
{error, disconnected} ->
ok;
%% Finkmaster Flex says timeouts are ok
@ -138,16 +139,16 @@ mapred_tester(Node, Count, Pid, Vsn) ->
{error, <<"{\"phase\":0,\"error\":\"[{vnode_down,noproc}]", _/binary>>} ->
ok;
Unexpected ->
loaded_upgrade ! {mapred, Node, Unexpected}
ReportPid ! {mapred, Node, Unexpected}
end,
mapred_tester(Node, Count + 1, PBC, Vsn).
mapred_tester(Node, Count + 1, PBC, Vsn, ReportPid).
twoi_tester(Node, 0, undefined, legacy) ->
twoi_tester(Node, 0, undefined, legacy, ReportPid) ->
lager:warning("Legacy nodes do not have 2i load applied"),
twoi_tester(Node, 1, undefined, legacy);
twoi_tester(Node, Count, Pid, legacy) ->
twoi_tester(Node, Count + 1, Pid, legacy);
twoi_tester(Node, Count, Pid, Vsn) ->
twoi_tester(Node, 1, undefined, legacy, ReportPid);
twoi_tester(Node, Count, Pid, legacy, ReportPid) ->
twoi_tester(Node, Count + 1, Pid, legacy, ReportPid);
twoi_tester(Node, Count, Pid, Vsn, ReportPid) ->
PBC = pb_pid_recycler(Pid, Node),
Key = Count rem 8000,
ExpectedKeys = [loaded_upgrade:int_to_key(Key)],
@ -158,38 +159,40 @@ twoi_tester(Node, Count, Pid, Vsn) ->
{binary_index, "plustwo"},
loaded_upgrade:int_to_key(Key + 2)),
riakc_pb_socket:get_index(
PBC,
PBC,
loaded_upgrade:bucket(twoi),
{integer_index, "plusone"},
Key + 1)
} of
} of
{{ok, ?INDEX_RESULTS{keys=BinKeys}}, {ok, ?INDEX_RESULTS{keys=IntKeys}}} ->
case {assert_equal(ExpectedKeys, BinKeys), assert_equal(ExpectedKeys, IntKeys)} of
{true, true} -> cool;
{false, false} ->
loaded_upgrade ! {twoi, Node, bolth_no_match};
ReportPid ! {twoi, Node, bolth_no_match};
{false, true} ->
loaded_upgrade ! {twoi, Node, bin_no_match};
ReportPid ! {twoi, Node, bin_no_match};
{true, false} ->
loaded_upgrade ! {twoi, Node, int_no_match}
ReportPid ! {twoi, Node, int_no_match}
end;
{{error, Reason}, _} ->
loaded_upgrade ! {twoi, Node, {error, Reason}};
ReportPid ! {twoi, Node, {error, Reason}};
{_, {error, Reason}} ->
loaded_upgrade ! {twoi, Node, {error, Reason}};
ReportPid ! {twoi, Node, {error, Reason}};
Unexpected ->
loaded_upgrade ! {twoi, Node, Unexpected}
ReportPid ! {twoi, Node, Unexpected}
end,
twoi_tester(Node, Count + 1, PBC, Vsn).
twoi_tester(Node, Count + 1, PBC, Vsn, ReportPid).
search_tester(Node, Count, Pid, Vsn) ->
search_tester(Node, Count, Pid, Vsn, ReportPid) ->
PBC = pb_pid_recycler(Pid, Node),
{Term, Size} = search_check(Count),
case riakc_pb_socket:search(PBC, loaded_upgrade:bucket(search), Term) of
{ok, Result} ->
case Size == Result#search_results.num_found of
true -> ok;
_ -> loaded_upgrade ! {search, Node, bad_result}
_ ->
lager:warning("Bad search result: ~p Expected: ~p", [Result#search_results.num_found, Size]),
ReportPid ! {search, Node, bad_result}
end;
{error, disconnected} ->
%% oh well, reconnect
@ -197,24 +200,24 @@ search_tester(Node, Count, Pid, Vsn) ->
{error, <<"Error processing incoming message: throw:{timeout,range_loop}:[{riak_search_backend", _/binary>>} ->
case rt:is_mixed_cluster(Node) of
true ->
true ->
ok;
_ ->
loaded_upgrade ! {search, Node, {timeout, range_loop}}
ReportPid ! {search, Node, {timeout, range_loop}}
end;
{error,<<"Error processing incoming message: error:{case_clause,", _/binary>>} ->
%% although it doesn't say so, this is the infamous badfun
case rt:is_mixed_cluster(Node) of
true ->
true ->
ok;
_ ->
loaded_upgrade ! {search, Node, {error, badfun}}
ReportPid ! {search, Node, {error, badfun}}
end;
Unexpected ->
loaded_upgrade ! {search, Node, Unexpected}
ReportPid ! {search, Node, Unexpected}
end,
search_tester(Node, Count + 1, PBC, Vsn).
search_tester(Node, Count + 1, PBC, Vsn, ReportPid).
search_check(Count) ->
case Count rem 6 of
@ -227,7 +230,7 @@ search_check(Count) ->
end.
assert_equal(Expected, Actual) ->
case Expected -- Actual of
case Expected -- Actual of
[] -> ok;
Diff -> lager:info("Expected -- Actual: ~p", [Diff])
end,
@ -243,4 +246,3 @@ pb_pid_recycler(Pid, Node) ->
riakc_pb_socket:stop(Pid),
rt:pbc(Node)
end.