Merge pull request #196 from basho/jd-loaded-upgrayedd-no-mix

Here's the new loaded upgrade test
This commit is contained in:
Joe DeVivo 2013-02-08 06:30:06 -08:00
commit 1d24e36a38
6 changed files with 477 additions and 644 deletions

View File

@ -12,13 +12,13 @@
{getopt, ".*", {git, "git://github.com/jcomellas/getopt", {tag, "v0.4"}}},
{meck, ".*", {git, "git://github.com/eproxus/meck"}},
{mapred_verify, ".*", {git, "git://github.com/basho/mapred_verify"}},
{riakc, "1.3.1", {git, "git://github.com/basho/riak-erlang-client", {tag, "1.3.1"}}},
{riakc, ".*", {git, "git://github.com/basho/riak-erlang-client"}},
{riakhttpc, "1.3.1", {git, "git://github.com/basho/riak-erlang-http-client", {tag, "1.3.1"}}},
{kvc, "1.3.0", {git, "https://github.com/etrepum/kvc", {tag, "v1.3.0"}}}
]}.
{escript_incl_apps, [lager, getopt, riakhttpc, riakc, ibrowse, mochiweb, kvc]}.
{escript_emu_args, "%%! +K true +P 10000 -env ERL_MAX_PORTS 10000\n"}.
{plugin_dir, "src"}.
{plugins, [rebar_riak_test_plugin]}.
{riak_test, [

View File

@ -20,7 +20,7 @@
%% @doc riak_test_runner runs a riak_test module's run/0 function.
-module(riak_test_runner).
-export([confirm/3, metadata/0]).
-export([confirm/3, metadata/0, metadata/1]).
-include_lib("eunit/include/eunit.hrl").
-spec(metadata() -> [{atom(), term()}]).
@ -31,6 +31,12 @@ metadata() ->
{metadata, TestMeta} -> TestMeta
end.
metadata(Pid) ->
riak_test ! {metadata, Pid},
receive
{metadata, TestMeta} -> TestMeta
end.
-spec(confirm(integer(), atom(), [{atom(), term()}]) -> [tuple()]).
%% @doc Runs a module's run/0 function after setting up a log capturing backend for lager.
%% It then cleans up that backend and returns the logs as part of the return proplist.
@ -98,6 +104,9 @@ rec_loop(Pid, TestModule, TestMetaData) ->
metadata ->
Pid ! {metadata, TestMetaData},
rec_loop(Pid, TestModule, TestMetaData);
{metadata, P} ->
P ! {metadata, TestMetaData},
rec_loop(Pid, TestModule, TestMetaData);
{'EXIT', Pid, normal} -> {pass, undefined};
{'EXIT', Pid, Error} ->
lager:warning("~s failed: ~p", [TestModule, Error]),

View File

@ -475,10 +475,13 @@ load_modules_on_nodes([Module | MoreModules], Nodes)
is_pingable(Node) ->
net_adm:ping(Node) =:= pong.
is_mixed_cluster(Nodes) ->
is_mixed_cluster(Nodes) when is_list(Nodes) ->
%% If the nodes are bad, we don't care what version they are
{Versions, _BadNodes} = rpc:multicall(Nodes, init, script_id, [], rt:config(rt_max_wait_time)),
length(lists:usort(Versions)) > 1.
length(lists:usort(Versions)) > 1;
is_mixed_cluster(Node) ->
Nodes = rpc:call(Node, erlang, nodes, []),
is_mixed_cluster(Nodes).
%% @private
is_ready(Node) ->
@ -573,14 +576,15 @@ wait_until_transfers_complete([Node0|_]) ->
?assertEqual(ok, wait_until(Node0, F)),
ok.
wait_for_service(Node, Service) ->
lager:info("Wait for service ~p on ~p", [Service, Node]),
wait_for_service(Node, Services) when is_list(Services) ->
F = fun(N) ->
Services = rpc:call(N, riak_core_node_watcher, services, [N]),
lists:member(Service, Services)
CurrServices = rpc:call(N, riak_core_node_watcher, services, [N]),
lists:all(fun(Service) -> lists:member(Service, CurrServices) end, Services)
end,
?assertEqual(ok, wait_until(Node, F)),
ok.
ok;
wait_for_service(Node, Service) ->
wait_for_service(Node, [Service]).
wait_for_cluster_service(Nodes, Service) ->
lager:info("Wait for cluster service ~p in ~p", [Service, Nodes]),

50
src/rt_worker_sup.erl Normal file
View File

@ -0,0 +1,50 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2012 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(rt_worker_sup).
-behavior(supervisor).
%% Helper macro for declaring children of supervisor
-define(CHILD(Id, Mod, Node, Backend), {
list_to_atom(atom_to_list(Node) ++ "_loader_" ++ integer_to_list(Id)),
{ Mod,
start_link,
[list_to_atom(atom_to_list(Node) ++ "_loader_" ++ integer_to_list(Id)), Node, Backend]},
permanent, 5000, worker, [Mod]}).
-export([init/1]).
-export([start_link/1]).
start_link(Props) ->
supervisor:start_link(?MODULE, Props).
init(Props) ->
WorkersPerNode = proplists:get_value(concurrent, Props),
Node = proplists:get_value(node, Props),
Backend = proplists:get_value(backend, Props),
ChildSpecs = [
?CHILD(Num, loaded_upgrade_worker_sup, Node, Backend)
|| Num <- lists:seq(1, WorkersPerNode)],
lager:debug("Starting ~p workers to ~p", [WorkersPerNode, Node]),
{ok, {{one_for_one, 1000, 60}, ChildSpecs}}.

View File

@ -18,687 +18,222 @@
%%
%% -------------------------------------------------------------------
-module(loaded_upgrade).
-behavior(riak_test).
-export([confirm/0]).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(SPAM_BUCKET, <<"scotts_spam">>).
-define(MAX_LIST_KEYS_ATTEMPTS, 4).
-define(MAX_CLIENT_RECONNECT_ATTEMPTS, 100).
-define(CLIENT_RECONNECT_INTERVAL, 100).
%% @doc This test requires additional setup, here's how to do it.
%% 1. Clone and build basho_bench
%% 2. Set an environment variable "BASHO_BENCH" to the path you cloned to.
%% 3. Get this file: search-corpus/spam.0-1.tar.gz
%% 4. Unzip it somewhere.
%% 5. Set an environment variable "SPAM_DIR" to the path you unzipped, including the "spam.0" dir
-include_lib("eunit/include/eunit.hrl").
-export([confirm/0]).
-export([kv_valgen/1, bucket/1, erlang_mr/0, int_to_key/1]).
-define(TIME_BETWEEN_UPGRADES, 300). %% Seconds!
confirm() ->
rt:config_or_os_env(basho_bench),
rt:config_or_os_env(spam_dir),
verify_upgrade(),
lager:info("Test ~p passed", [?MODULE]),
pass.
verify_upgrade() ->
case whereis(loaded_upgrade) of
undefined -> meh;
_ -> unregister(loaded_upgrade)
end,
register(loaded_upgrade, self()),
%% Build Cluster
TestMetaData = riak_test_runner:metadata(),
%% Only run 2i for level
Backend = proplists:get_value(backend, TestMetaData),
OldVsn = proplists:get_value(upgrade_version, TestMetaData, previous),
Config = [{riak_search, [{enabled, true}]}],
%% Uncomment to use settings more prone to cause races
%% Config = [{riak_core, [{handoff_concurrency, 1024},
%% {vnode_inactivity_timeout, 1000},
%% {vnode_rolling_start, 128},
%% {vnode_management_timer, 1000},
%% {gossip_limit, {10000, 1000}}]},
%% {riak_search, [{enabled, true}]}],
Config = [{riak_search, [{enabled, true}]}, {riak_pipe, [{worker_limit, 200}]}],
NumNodes = 4,
Vsns = [{OldVsn, Config} || _ <- lists:seq(2,NumNodes)],
Nodes = rt:build_cluster([{current, Config} | Vsns]),
[Node1|OldNodes] = Nodes,
_HeadMon = init_node_monitor(Node1, self()),
Vsns = [{OldVsn, Config} || _ <- lists:seq(1,NumNodes)],
Nodes = rt:build_cluster(Vsns),
seed_cluster(Nodes),
%% Now we have a cluster!
%% Let's spawn workers against it.
timer:sleep(10000),
Concurrent = rt:config(load_workers, 10),
Sups = [
{rt_worker_sup:start_link([
{concurrent, Concurrent},
{node, Node},
{backend, Backend}
]), Node}
|| Node <- Nodes],
upgrade_recv_loop(),
[begin
exit(Sup, normal),
lager:info("Upgrading ~p", [Node]),
rt:upgrade(Node, current),
NewSup = rt_worker_sup:start_link([
{concurrent, Concurrent},
{node, Node},
{backend, Backend}
]),
_NodeMon = init_node_monitor(Node, NewSup, self()),
upgrade_recv_loop()
end || {{ok, Sup}, Node} <- Sups],
pass.
upgrade_recv_loop() ->
{SMega, SSec, SMicro} = os:timestamp(),
EndSecs = SSec + ?TIME_BETWEEN_UPGRADES,
EndTime = case EndSecs > 1000000 of
true ->
{SMega + 1, EndSecs - 1000000, SMicro};
_ ->
{SMega, EndSecs, SMicro}
end,
upgrade_recv_loop(EndTime).
%% TODO: Collect error message counts in ets table
upgrade_recv_loop(EndTime) ->
Now = os:timestamp(),
case Now > EndTime of
true ->
lager:info("Done waiting 'cause ~p > ~p", [Now, EndTime]);
_ ->
receive
{mapred, _Node, bad_result} ->
?assert(false);
{kv, _Node, not_equal} ->
?assert(false);
{listkeys, _Node, not_equal} ->
?assert(false);
Msg ->
lager:info("Received Mesg ~p", [Msg]),
upgrade_recv_loop(EndTime)
after timer:now_diff(EndTime, Now) div 1000 ->
lager:info("Done waiting 'cause ~p is up", [?TIME_BETWEEN_UPGRADES])
end
end.
seed_cluster(Nodes=[Node1|_]) ->
lager:info("Seeding Cluster"),
%% For List Keys
lager:info("Writing 100 keys to ~p", [Node1]),
rt:systest_write(Node1, 100, 3),
?assertEqual([], rt:systest_read(Node1, 100, 1)),
lager:info("Checking list_keys count periodically throughout this test."),
spawn_link(?MODULE, check_list_keys, [Node1]),
seed(Node1, 0, 100, fun(Key) ->
%%Bin = list_to_binary(["", integer_to_list(Key)]),
Bin = iolist_to_binary(io_lib:format("~p", [Key])),
riakc_obj:new(<<"objects">>, Bin, Bin)
end),
Conns = rt:connection_info(Nodes),
NodeConn = proplists:get_value(Node1, Conns),
lager:info("NodeConn: ~p", [NodeConn]),
%% For KV
kv_seed(Node1),
KV1 = init_kv_tester(NodeConn),
MR1 = init_mapred_tester(NodeConn),
Search1 = init_search_tester(Nodes, Conns),
%% for 2i
twoi_seed(Node1),
TwoI1 = case Backend of
eleveldb -> init_2i_tester(NodeConn);
_ -> undefined
end,
[begin
KV2 = spawn_kv_tester(KV1),
MR2 = spawn_mapred_tester(MR1),
TwoI2 = case TwoI1 of
undefined -> undefined;
_ -> spawn_2i_tester(TwoI1)
end,
Search2 = spawn_search_tester(Search1),
lager:info("Upgrading ~p", [Node]),
rt:upgrade(Node, current),
_NodeMon = init_node_monitor(Node, self()),
%% rt:slow_upgrade(Node, current, Nodes),
_KV3 = check_kv_tester(KV2),
_MR3 = check_mapred_tester(MR2),
_TwoI3 = case TwoI1 of
undefined -> undefined;
_ -> check_2i_tester(TwoI2)
end,
_Search3 = check_search_tester(Search2, false),
lager:info("Ensuring keys still exist"),
rt:wait_for_cluster_service(Nodes, riak_kv),
?assertEqual([], rt:systest_read(Node1, 100, 1))
end || Node <- OldNodes],
lager:info("Upgrade complete, ensure search now passes"),
check_search_tester(spawn_search_tester(Search1), true),
ok.
%% for mapred
mr_seed(Node1),
%% ===================================================================
%% List Keys Tester
%% ===================================================================
%% For MC Serch
rt:enable_search_hook(Node1, bucket(search)),
rt:wait_until_ring_converged(Nodes),
seed_search(Node1).
check_list_keys(Node) ->
check_list_keys(rt:pbc(Node), 0).
%% Buckets
bucket(kv) -> <<"utest">>;
bucket(twoi) -> <<"2ibuquot">>;
bucket(mapred) -> <<"bryanitbs">>;
bucket(search) -> <<"scotts_spam">>.
check_list_keys(Pid, Attempt) ->
case Attempt rem 20 of
0 -> lager:debug("Performing list_keys check #~p", [Attempt]);
_ -> nothing
end,
{ok, Keys} = list_keys(Pid, <<"systest">>),
?assertEqual(100, length(Keys)),
timer:sleep(3000),
check_list_keys(Pid, Attempt + 1).
%% List keys with time out recovery.
list_keys(Pid, Bucket) ->
list_keys(Pid, Bucket, ?MAX_LIST_KEYS_ATTEMPTS,
riakc_pb_socket:default_timeout(list_keys_timeout)).
list_keys(_, _, 0, _) ->
{error, "list_keys timed out too many times"};
list_keys(Pid, Bucket, Attempts, TimeOut) ->
Res = riakc_pb_socket:list_keys(Pid, Bucket, TimeOut),
case Res of
{error, Err} when
Err =:= timeout;
Err =:= <<"timeout">>;
is_tuple(Err), element(1, Err) =:= timeout ->
?assertMatch(ok, wait_for_reconnect(Pid)),
NewAttempts = Attempts - 1,
NewTimeOut = TimeOut * 2,
lager:info("List keys timed out, trying ~p more times, new time out = ~p",
[NewAttempts, NewTimeOut]),
list_keys(Pid, Bucket, NewAttempts, NewTimeOut);
_ -> Res
end.
wait_for_reconnect(Pid) ->
wait_for_reconnect(Pid, ?MAX_CLIENT_RECONNECT_ATTEMPTS, ?CLIENT_RECONNECT_INTERVAL).
wait_for_reconnect(Pid, 0, _) ->
lager:error("Could not reconnect client ~p to Riak after timed out list keys", [Pid]),
{error, pbc_client_reconnect_timed_out};
wait_for_reconnect(Pid, Attempts, Delay) ->
timer:sleep(Delay),
case riakc_pb_socket:is_connected(Pid) of
true -> ok;
_ -> wait_for_reconnect(Pid, Attempts-1, Delay)
end.
%% ===================================================================
%% K/V Tester
%% ===================================================================
-record(kv, {buckets, runs}).
init_kv_tester(Conn) ->
{Host, Port} = proplists:get_value(http, Conn),
Buckets = [{<<"utest">>, []}],
{BucketNames, _} = lists:unzip(Buckets),
generate_kv_scripts(BucketNames, Host, Port),
[kv_populate(Bucket) || Bucket <- BucketNames],
#kv{buckets=BucketNames, runs=[]}.
spawn_kv_tester(KV=#kv{buckets=Buckets}) ->
Count = 3,
Runs = [{Bucket, kv_spawn_verify(Bucket)} || Bucket <- Buckets,
_ <- lists:seq(1,Count)],
KV#kv{runs=Runs}.
check_kv_tester(KV=#kv{runs=Runs}) ->
Failed = [Bucket || {Bucket, Run} <- Runs,
ok /= kv_check_verify(Bucket, Run, [])],
[begin
lager:info("Failed k/v test for: ~p", [Bucket]),
lager:info("Re-running until test passes to check for data loss"),
Result =
rt:wait_until(node(),
fun(_) ->
Rerun = kv_spawn_verify(Bucket),
ok == kv_check_verify(Bucket, Rerun, [])
end),
?assertEqual(ok, Result),
lager:info("k/v test finally passed"),
ok
end || Bucket <- Failed],
KV#kv{runs=[]}.
kv_populate(Bucket) when is_binary(Bucket) ->
kv_populate(binary_to_list(Bucket));
kv_populate(Bucket) ->
Config = "bb-populate-" ++ Bucket ++ ".config",
lager:info("Populating bucket ~s", [Bucket]),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
?assertMatch({0,_}, rt:cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}])),
ok.
kv_spawn_verify(Bucket) when is_binary(Bucket) ->
kv_spawn_verify(binary_to_list(Bucket));
kv_spawn_verify(Bucket) ->
Config = "bb-verify-" ++ Bucket ++ ".config",
lager:info("Spawning k/v test against: ~s", [Bucket]),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
rt:spawn_cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}]).
kv_check_verify(Bucket, Port, Opts) ->
lager:info("Checking k/v test against: ~p", [Bucket]),
{Status,_} = rt:wait_for_cmd(Port),
Repair = ordsets:is_element(repair, Opts),
case {Repair, Status} of
%% {true, 1} ->
%% lager:info("Allowing repair: ~p", [Bucket]),
%% kv_verify_repair(Bucket);
{_, 0} ->
ok;
{_, _} ->
fail
end.
kv_spawn_repair(Bucket) when is_binary(Bucket) ->
kv_spawn_repair(binary_to_list(Bucket));
kv_spawn_repair(Bucket) ->
Config = "bb-repair-" ++ Bucket ++ ".config",
lager:info("Read-repairing bucket ~s", [Bucket]),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
rt:spawn_cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}]).
%% ===================================================================
%% map/reduce Tester
%% ===================================================================
-record(mr, {runs}).
init_mapred_tester(Conn) ->
{Host, Port} = proplists:get_value(pb, Conn),
generate_mapred_scripts(Host, Port),
mapred_populate(),
#mr{runs=[]}.
spawn_mapred_tester(MR) ->
Count = 3,
Runs = [mapred_spawn_verify() || _ <- lists:seq(1,Count)],
MR#mr{runs=Runs}.
check_mapred_tester(MR=#mr{runs=Runs}) ->
Failed = [failed || Run <- Runs,
ok /= mapred_check_verify(Run)],
[begin
lager:info("Failed mapred test"),
lager:info("Re-running until test passes to check for data loss"),
Result =
rt:wait_until(node(),
fun(_) ->
Rerun = mapred_spawn_verify(),
ok == mapred_check_verify(Rerun)
end),
?assertEqual(ok, Result),
lager:info("map/reduce test finally passed"),
ok
end || _ <- Failed],
MR#mr{runs=[]}.
mapred_populate() ->
Config = "bb-populate-mapred.config",
lager:info("Populating map-reduce bucket"),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
?assertMatch({0,_}, rt:cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}])),
ok.
mapred_spawn_verify() ->
Config = "bb-verify-mapred.config",
lager:info("Spawning map/reduce test"),
rt:spawn_cmd("$BASHO_BENCH/basho_bench " ++ Config, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}]).
mapred_check_verify(Port) ->
lager:info("Checking map/reduce test"),
case rt:wait_for_cmd(Port) of
{0,_} ->
ok;
_ ->
fail
end.
%% ===================================================================
%% Search tester
%% ===================================================================
-record(search, {buckets, runs}).
init_search_tester(Nodes, Conns) ->
seed_search(Node) ->
Pid = rt:pbc(Node),
SpamDir = rt:config(spam_dir),
IPs = [proplists:get_value(http, I) || {_, I} <- Conns],
Buckets = [?SPAM_BUCKET],
rt:enable_search_hook(hd(Nodes), ?SPAM_BUCKET),
generate_search_scripts(Buckets, IPs, SpamDir),
[search_populate(Bucket) || Bucket <- Buckets],
%% Check search queries actually work as expected
[check_search(Bucket, Nodes) || Bucket <- Buckets],
#search{buckets=Buckets, runs=[]}.
Files = case SpamDir of
undefined -> undefined;
_ -> filelib:wildcard(SpamDir ++ "/*")
end,
seed_search(Pid, Files),
riakc_pb_socket:stop(Pid).
check_search(?SPAM_BUCKET, Nodes) ->
SearchResults = [{"mx.example.net", 187},
{"ZiaSun", 1},
{"headaches", 4},
{"YALSP", 3},
{"mister", 0},
{"prohibiting", 5}],
Results = [{Term,Count} || {Term, Count} <- SearchResults,
Node <- Nodes,
{Count2,_} <- [rpc:call(Node, search, search, [?SPAM_BUCKET, Term])],
Count2 == Count],
Expected = lists:usort(SearchResults),
Actual = lists:usort(Results),
case {rt:is_mixed_cluster(Nodes), Expected == Actual} of
{false, _} -> ?assertEqual(Expected, Actual);
{true, false} ->
lager:info(
"[KNOWN ISSUE] Search returned inaccurate results; however, the cluster is in a mixed state. Got: ~p",
[Actual]
);
_ -> ok %% this is the success case, no need to do anything else
seed_search(_Pid, []) -> ok;
seed_search(Pid, [File|Files]) ->
Key = list_to_binary(filename:basename(File)),
rt:pbc_put_file(Pid, bucket(search), Key, File),
seed_search(Pid, Files).
kv_seed(Node) ->
ValFun = fun(Key) ->
riakc_obj:new(bucket(kv), iolist_to_binary(io_lib:format("~p", [Key])), kv_valgen(Key))
end,
ok.
seed(Node, 0, 7999, ValFun).
spawn_search_tester(Search=#search{buckets=Buckets}) ->
Count = 3,
Runs = [{Bucket, search_spawn_verify(Bucket)} || Bucket <- Buckets,
_ <- lists:seq(1, Count)],
Search#search{runs=Runs}.
kv_valgen(Key) ->
term_to_binary(lists:seq(0, Key)).
check_search_tester(Search=#search{runs=Runs}, Retest) ->
Failed = [Bucket || {Bucket, Run} <- Runs,
ok /= search_check_verify(Bucket, Run, [])],
[begin
lager:info("Failed search test for: ~p", [Bucket]),
maybe_retest_search(Retest, Bucket),
ok
end || Bucket <- Failed],
Search#search{runs=[]}.
int_to_key(KInt) ->
list_to_binary(["", integer_to_list(KInt)]).
maybe_retest_search(false, _) ->
ok;
maybe_retest_search(true, Bucket) ->
lager:info("Re-running until test passes to check for data loss"),
Result =
rt:wait_until(node(),
fun(_) ->
Rerun = search_spawn_verify(Bucket),
ok == search_check_verify(Bucket, Rerun, [])
end),
?assertEqual(ok, Result),
lager:info("search test finally passed"),
ok.
%% Every 2i seeded object will have indexes
%% int_plusone -> [Key + 1, Key + 10000]
%% bin_plustwo -> [<<"Key + 2">>]
twoi_seed(Node) ->
ValFun = fun(Key) ->
Obj = riakc_obj:new(bucket(twoi), iolist_to_binary(io_lib:format("~p", [Key])), kv_valgen(Key)),
MD1 = riakc_obj:get_update_metadata(Obj),
MD2 = riakc_obj:set_secondary_index(MD1, [
{{integer_index, "plusone"}, [Key + 1, Key + 10000]},
{{binary_index, "plustwo"}, [int_to_key(Key + 2)]}
]),
riakc_obj:update_metadata(Obj, MD2)
end,
seed(Node, 0, 7999, ValFun).
search_populate(Bucket) when is_binary(Bucket) ->
search_populate(binary_to_list(Bucket));
search_populate(Bucket) ->
Config = "bb-populate-" ++ Bucket ++ ".config",
lager:info("Populating search bucket: ~s", [Bucket]),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
rt:cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}]).
erlang_mr() ->
[{map, {modfun, riak_kv_mapreduce, map_object_value}, none, false},
{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, none, true}].
search_spawn_verify(Bucket) when is_binary(Bucket) ->
search_spawn_verify(binary_to_list(Bucket));
search_spawn_verify(Bucket) when is_list(Bucket) ->
Config = "bb-verify-" ++ Bucket ++ ".config",
lager:info("Spawning search test against: ~s", [Bucket]),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
rt:spawn_cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)},
{"SPAM_DIR", rt:config(spam_dir)}
]}]).
mr_seed(Node) ->
%% to be used along with sequential_int keygen to populate known
%% mapreduce set
ValFun = fun(Key) ->
Value = iolist_to_binary(io_lib:format("~p", [Key])),
riakc_obj:new(bucket(mapred), Value, Value)
end,
seed(Node, 0, 9999, ValFun).
search_check_verify(Bucket, Port, Opts) ->
lager:info("Checking search test against: ~p", [Bucket]),
{Status,_} = rt:wait_for_cmd(Port),
Repair = ordsets:is_element(repair, Opts),
case {Repair, Status} of
%% {true, 1} ->
%% lager:info("Allowing repair: ~p", [Bucket]),
%% search_verify_repair(Bucket);
{_, 0} ->
ok;
{_, _} ->
fail
end.
seed(Node, Start, End, ValFun) ->
PBC = rt:pbc(Node),
%% ===================================================================
%% 2i Tester
%% ===================================================================
[ begin
Obj = ValFun(Key),
riakc_pb_socket:put(PBC, Obj)
end || Key <- lists:seq(Start, End)],
-record(twoi, {runs}).
init_2i_tester(Conn) ->
{PBHost, PBPort} = proplists:get_value(pb, Conn),
{HTTPHost, HTTPPort} = proplists:get_value(http, Conn),
generate_2i_scripts(<<"2ibuquot">>, [{PBHost, PBPort}], [{HTTPHost, HTTPPort}]),
twoi_populate(),
#twoi{runs=[]}.
spawn_2i_tester(TwoI) ->
Count = 3,
Runs = [twoi_spawn_verify() || _ <- lists:seq(1,Count)],
TwoI#twoi{runs=Runs}.
check_2i_tester(TwoI=#twoi{runs=Runs}) ->
Failed = [failed || Run <- Runs,
ok /= twoi_check_verify(Run)],
[begin
lager:info("Failed 2i test"),
lager:info("Re-running until test passes to check for data loss"),
Result =
rt:wait_until(node(),
fun(_) ->
Rerun = twoi_spawn_verify(),
ok == twoi_check_verify(Rerun)
end),
?assertEqual(ok, Result),
lager:info("2i test finally passed"),
ok
end || _ <- Failed],
TwoI#twoi{runs=[]}.
twoi_populate() ->
Config = "bb-populate-2i.config",
lager:info("Populating 2i bucket"),
Cmd = "$BASHO_BENCH/basho_bench " ++ Config,
?assertMatch({0,_}, rt:cmd(Cmd, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)}
]}])),
ok.
twoi_spawn_verify() ->
Config = "bb-verify-2i.config",
lager:info("Spawning 2i test"),
rt:spawn_cmd("$BASHO_BENCH/basho_bench " ++ Config, [{cd, rt:config(rt_scratch_dir)}, {env, [
{"BASHO_BENCH", rt:config(basho_bench)}
]}]).
twoi_check_verify(Port) ->
lager:info("Checking 2i test"),
case rt:wait_for_cmd(Port) of
{0,_} ->
ok;
_ ->
fail
end.
%% ===================================================================
%% basho_bench K/V scripts
%% ===================================================================
generate_kv_scripts(Buckets, Host, Port) ->
[begin
Bucket = binary_to_list(BucketBin),
kv_populate_script(Bucket, Host, Port),
kv_verify_script(Bucket, Host, Port),
kv_repair_script(Bucket, Host, Port)
end || BucketBin <- Buckets],
ok.
kv_populate_script(Bucket, Host, Port) ->
Cfg = [{mode, max},
{duration, infinity},
{concurrent, 16},
{driver, basho_bench_driver_http_raw},
{key_generator, {partitioned_sequential_int, 0, 8000}},
{value_generator, {uniform_bin,100,1000}},
{operations, [{update, 1}]},
{http_raw_ips, [Host]},
{http_raw_port, Port},
{http_raw_path, "/riak/" ++ Bucket}],
Config = filename:join([rt:config(rt_scratch_dir), io_lib:format("bb-populate-~s.config", [Bucket])]),
write_terms(Config, Cfg),
ok.
kv_verify_script(Bucket, Host, Port) ->
Cfg = [{mode, {rate, 50}},
%%{duration, infinity},
{duration, 1},
{concurrent, 10},
{driver, basho_bench_driver_http_raw},
{key_generator, {uniform_int, 7999}},
{value_generator, {uniform_bin,100,1000}},
{operations, [{update, 1},{get_existing, 1}]},
{http_raw_ips, [Host]},
{http_raw_port, Port},
{http_raw_path, "/riak/" ++ Bucket},
{shutdown_on_error, true}],
Config = filename:join([rt:config(rt_scratch_dir), io_lib:format("bb-verify-~s.config", [Bucket])]),
write_terms(Config, Cfg),
ok.
kv_repair_script(Bucket, Host, Port) ->
Cfg = [{mode, {rate, 50}},
{duration, infinity},
%%{duration, 1},
{concurrent, 10},
{driver, basho_bench_driver_http_raw},
%%{key_generator, {uniform_int, 8000}},
{key_generator, {partitioned_sequential_int, 0, 8000}},
{value_generator, {uniform_bin,100,1000}},
{operations, [{get, 1}]},
{http_raw_ips, [Host]},
{http_raw_port, Port},
{http_raw_path, "/riak/" ++ Bucket}],
Config = filename:join([rt:config(rt_scratch_dir), io_lib:format("bb-repair-~s.config", [Bucket])]),
write_terms(Config, Cfg),
ok.
%% ===================================================================
%% basho_bench map/reduce scripts
%% ===================================================================
generate_mapred_scripts(Host, Port) ->
mapred_populate_script(Host, Port),
mapred_verify_script(Host, Port),
ok.
mapred_populate_script(Host, Port) ->
Cfg = [{driver, basho_bench_driver_riakc_pb},
{riakc_pb_ips, [{Host, Port}]},
{riakc_pb_replies, 1},
{riakc_pb_bucket, <<"bryanitbs">>},
{mode, max},
{duration, 10000},
{concurrent, 1},
{operations, [{put, 1}]},
{key_generator, {int_to_str, {sequential_int, 10000}}},
{value_generator,
{function, basho_bench_driver_riakc_pb, mapred_ordered_valgen, []}}],
Config = filename:join([rt:config(rt_scratch_dir), "bb-populate-mapred.config"]),
write_terms(Config, Cfg),
ok.
mapred_verify_script(Host, Port) ->
Cfg = [{driver, basho_bench_driver_riakc_pb},
{riakc_pb_ips, [{Host, Port}]},
{riakc_pb_replies, 1},
{riakc_pb_bucket, <<"bryanitbs">>},
{riakc_pb_preloaded_keys, 10000},
{mode, max},
{duration, 1},
{concurrent, 1},
{operations, [{mr_bucket_erlang, 1}]},
{key_generator, {int_to_str, {uniform_int, 9999}}},
{value_generator, {fixed_bin, 1}},
{riakc_pb_keylist_length, 1000},
{shutdown_on_error, true}],
Config = filename:join([rt:config(rt_scratch_dir), "bb-verify-mapred.config"]),
write_terms(Config, Cfg),
ok.
%% ===================================================================
%% basho_bench Search scritps
%% ===================================================================
generate_search_scripts(Buckets, IPs, SpamDir) ->
[begin
Bucket = binary_to_list(BucketBin),
search_populate_script(Bucket, IPs, SpamDir),
search_verify_script(Bucket, IPs)
end || BucketBin <- Buckets],
ok.
search_populate_script(Bucket, IPs, SpamDir) ->
Cfg = [{mode, max},
{duration, 1},
{concurrent, 10},
{driver, basho_bench_driver_http_raw},
{file_dir, SpamDir},
{operations, [{put_file,1}]},
{http_raw_ips, IPs},
{http_raw_path, "/riak/" ++ Bucket},
{shutdown_on_error, true}],
Config = filename:join([rt:config(rt_scratch_dir), io_lib:format("bb-populate-~s.config", [Bucket])]),
write_terms(Config, Cfg).
search_verify_script(Bucket, IPs) ->
Expect = [{"mx.example.net", 187},
{"ZiaSun", 1},
{"headaches", 4},
{"YALSP", 3},
{"mister", 0},
{"prohibiting", 5}],
Operations = [{{search,E},1} || E <- Expect],
Cfg = [{mode, max},
{duration, 1},
{concurrent, 10},
{driver, basho_bench_driver_http_raw},
{operations, Operations},
{http_raw_ips, IPs},
{http_solr_path, "/solr/" ++ Bucket},
{http_raw_path, "/riak/" ++ Bucket},
{shutdown_on_error, true}],
Config = filename:join([rt:config(rt_scratch_dir), io_lib:format("bb-verify-~s.config", [Bucket])]),
write_terms(Config, Cfg).
%% ===================================================================
%% basho_bench 2i scritps
%% ===================================================================
generate_2i_scripts(Bucket, PBIPs, HTTPIPs) ->
twoi_populate_script(Bucket, PBIPs, HTTPIPs),
twoi_verify_script(Bucket, PBIPs, HTTPIPs),
ok.
twoi_populate_script(Bucket, PBIPs, HTTPIPs) ->
Cfg = [ {driver, basho_bench_driver_2i},
{operations, [{{put_pb, 5}, 1}]},
{mode, max},
{duration, 10000},
{concurrent, 1},
{key_generator, {sequential_int, 10000}},
{value_generator, {fixed_bin, 10000}},
{riakc_pb_bucket, Bucket},
{pb_ips, PBIPs},
{pb_replies, 1},
{http_ips, HTTPIPs}],
Config = filename:join([rt:config(rt_scratch_dir), "bb-populate-2i.config"]),
write_terms(Config, Cfg),
ok.
twoi_verify_script(Bucket, PBIPs, HTTPIPs) ->
Cfg = [ {driver, basho_bench_driver_2i},
{operations, [
{{query_http, 10}, 1},
{{query_mr, 10}, 1},
{{query_pb, 10}, 1}
]},
{mode, max},
{duration, 1},
{concurrent, 3},
{key_generator, {uniform_int, 10000}},
{value_generator, {fixed_bin, 10000}},
{riakc_pb_bucket, Bucket},
{pb_ips, PBIPs},
{pb_replies, 1},
{http_ips, HTTPIPs},
{enforce_keyrange, 10000}],
Config = filename:join([rt:config(rt_scratch_dir), "bb-verify-2i.config"]),
write_terms(Config, Cfg),
ok.
write_terms(File, Terms) ->
{ok, IO} = file:open(File, [write]),
[io:fwrite(IO, "~p.~n", [T]) || T <- Terms],
file:close(IO).
riakc_pb_socket:stop(PBC).
%% ===================================================================
%% Monitor nodes after they upgrade
%% ===================================================================
init_node_monitor(Node, TestProc) ->
spawn_link(fun() -> node_monitor(Node, TestProc) end).
init_node_monitor(Node, Sup, TestProc) ->
spawn_link(fun() -> node_monitor(Node, Sup, TestProc) end).
node_monitor(Node, TestProc) ->
node_monitor(Node, Sup, TestProc) ->
lager:info("Monitoring node ~p to make sure it stays up.", [Node]),
erlang:process_flag(trap_exit, true),
erlang:monitor_node(Node, true),
node_monitor_loop(Node, TestProc).
node_monitor_loop(Node, Sup, TestProc).
node_monitor_loop(Node, TestProc) ->
node_monitor_loop(Node, Sup, TestProc) ->
receive
{nodedown, Node} ->
lager:error("Node ~p exited after upgrade!", [Node]),
exit(Sup, normal),
?assertEqual(nodeup, {nodedown, Node});
{'EXIT', TestProc, _} ->
erlang:monitor_node(Node, false),
ok;
Other ->
lager:warn("Node monitor for ~p got unknown message ~p", [Node, Other]),
node_monitor_loop(Node, TestProc)
node_monitor_loop(Node, Sup, TestProc)
end.

View File

@ -0,0 +1,235 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2012 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(loaded_upgrade_worker_sup).
-include_lib("eunit/include/eunit.hrl").
-include_lib("riakc/include/riakc.hrl").
-behavior(supervisor).
%% API
-export([assert_equal/2]).
-export([list_keys_tester/3, kv_tester/3, mapred_tester/3,
twoi_tester/3, search_tester/3, tester_start_link/2]).
-export([init/1]).
-export([start_link/3]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Name, FunName, Node), {
list_to_atom(atom_to_list(Name) ++ "_" ++ atom_to_list(FunName)),
{ ?MODULE,
tester_start_link,
[FunName, Node]},
permanent, 5000, worker, [?MODULE]}).
start_link(Name, Node, Backend) ->
supervisor:start_link(?MODULE, [Name, Node, Backend]).
init([Name, Node, Backend]) ->
rt:wait_for_service(Node, [riak_search,riak_kv,riak_pipe]),
ChildSpecs1 = [
?CHILD(Name, FunName, Node)
|| FunName <- [list_keys_tester, mapred_tester, kv_tester, search_tester]],
ChildSpecs = case Backend of
eleveldb ->
[?CHILD(Name, twoi_tester, Node) | ChildSpecs1];
_ -> ChildSpecs1
end,
{ok, {{one_for_one, 1000, 60}, ChildSpecs}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
tester_start_link(Function, Node) ->
{ok, spawn_link(?MODULE, Function, [Node, 0, undefined])}.
list_keys_tester(Node, Count, Pid) ->
PBC = pb_pid_recycler(Pid, Node),
case riakc_pb_socket:list_keys(PBC, <<"objects">>) of
{ok, Keys} ->
ActualKeys = lists:usort(Keys),
ExpectedKeys = lists:usort([loaded_upgrade:int_to_key(K) || K <- lists:seq(0, 100)]),
case assert_equal(ExpectedKeys, ActualKeys) of
true -> cool;
_ -> loaded_upgrade ! {listkeys, Node, not_equal}
end;
{error, timeout} ->
loaded_upgrade ! {listkeys, Node, timeout};
{error, {timeout, _}} ->
loaded_upgrade ! {listkeys, Node, timeout};
Unexpected ->
loaded_upgrade ! {listkeys, Node, Unexpected}
end,
list_keys_tester(Node, Count + 1, PBC).
kv_tester(Node, Count, Pid) ->
PBC = pb_pid_recycler(Pid, Node),
Key = Count rem 8000,
case riakc_pb_socket:get(PBC, loaded_upgrade:bucket(kv), loaded_upgrade:int_to_key(Key)) of
{ok, Val} ->
case loaded_upgrade:kv_valgen(Key) == riakc_obj:get_value(Val) of
true -> cool;
_ -> loaded_upgrade ! {kv, Node, not_equal}
end;
{error, disconnected} ->
ok;
Unexpected ->
loaded_upgrade ! {kv, Node, Unexpected}
end,
kv_tester(Node, Count + 1, PBC).
mapred_tester(Node, Count, Pid) ->
PBC = pb_pid_recycler(Pid, Node),
case riakc_pb_socket:mapred(PBC, loaded_upgrade:bucket(mapred), loaded_upgrade:erlang_mr()) of
{ok, [{1, [10000]}]} ->
ok;
{ok, _R} ->
loaded_upgrade ! {mapred, Node, bad_result};
{error, disconnected} ->
ok;
%% Finkmaster Flex says timeouts are ok
{error, timeout} ->
ok;
{error, {timeout, _}} ->
ok;
{error, <<"{\"phase\":\"listkeys\",\"error\":\"{badmatch,{'EXIT',noproc}}", _/binary>>} ->
ok;
{error, <<"{\"phase\":\"listkeys\",\"error\":\"{badmatch,{'EXIT',timeout}}", _/binary>>} ->
ok;
{error, <<"{\"phase\":0,\"error\":\"badarg", _/binary>>} ->
ok;
{error, <<"{\"phase\":0,\"error\":\"[preflist_exhausted]", _/binary>>} ->
ok;
{error, <<"{\"phase\":0,\"error\":\"{badmatch,{'EXIT',timeout}}", _/binary>>} ->
ok;
{error, <<"{\"phase\":\"listkeys\",\"error\":\"function_clause\",\"input\":\"{cover,", _/binary>>} ->
ok;
{error, <<"{\"phase\":\"listkeys\",\"error\":\"badarg\",\"input\":\"{cover,", _/binary>>} ->
ok;
{error, <<"Error processing stream message: exit:{ucs,{bad_utf8_character_code}}:[{xmerl_ucs,", _/binary>>} ->
ok;
{error, <<"{\"phase\":0,\"error\":\"[{vnode_down,{shutdown,{gen_fsm,sync_send_event,", _/binary>>} ->
ok;
{error, <<"{\"phase\":0,\"error\":\"[{vnode_down,noproc}]", _/binary>>} ->
ok;
Unexpected ->
loaded_upgrade ! {mapred, Node, Unexpected}
end,
mapred_tester(Node, Count + 1, PBC).
twoi_tester(Node, Count, Pid) ->
PBC = pb_pid_recycler(Pid, Node),
Key = Count rem 8000,
ExpectedKeys = [loaded_upgrade:int_to_key(Key)],
case {
riakc_pb_socket:get_index(
PBC,
loaded_upgrade:bucket(twoi),
{binary_index, "plustwo"},
loaded_upgrade:int_to_key(Key + 2)),
riakc_pb_socket:get_index(
PBC,
loaded_upgrade:bucket(twoi),
{integer_index, "plusone"},
Key + 1)
} of
{{ok, BinKeys}, {ok, IntKeys}} ->
case {assert_equal(ExpectedKeys, BinKeys), assert_equal(ExpectedKeys, IntKeys)} of
{true, true} -> cool;
{false, false} ->
loaded_upgrade ! {twoi, Node, bolth_no_match};
{false, true} ->
loaded_upgrade ! {twoi, Node, bin_no_match};
{true, false} ->
loaded_upgrade ! {twoi, Node, int_no_match}
end;
{{error, Reason}, _} ->
loaded_upgrade ! {twoi, Node, {error, Reason}};
{_, {error, Reason}} ->
loaded_upgrade ! {twoi, Node, {error, Reason}};
Unexpected ->
loaded_upgrade ! {twoi, Node, Unexpected}
end,
twoi_tester(Node, Count + 1, PBC).
search_tester(Node, Count, Pid) ->
PBC = pb_pid_recycler(Pid, Node),
{Term, Size} = search_check(Count),
case riakc_pb_socket:search(PBC, loaded_upgrade:bucket(search), Term) of
{ok, Result} ->
?assertEqual(Size, Result#search_results.num_found);
{error, disconnected} ->
%% oh well, reconnect
ok;
{error, <<"Error processing incoming message: throw:{timeout,range_loop}:[{riak_search_backend", _/binary>>} ->
case rt:is_mixed_cluster(Node) of
true ->
ok;
_ ->
loaded_upgrade ! {search, Node, {timeout, range_loop}}
end;
{error,<<"Error processing incoming message: error:{case_clause,", _/binary>>} ->
%% although it doesn't say so, this is the infamous badfun
case rt:is_mixed_cluster(Node) of
true ->
ok;
_ ->
loaded_upgrade ! {search, Node, {error, badfun}}
end;
Unexpected ->
loaded_upgrade ! {search, Node, Unexpected}
end,
search_tester(Node, Count + 1, PBC).
search_check(Count) ->
case Count rem 6 of
0 -> { <<"mx.example.net">>, 187};
1 -> { <<"ZiaSun">>, 1};
2 -> { <<"headaches">>, 4};
3 -> { <<"YALSP">>, 3};
4 -> { <<"mister">>, 0};
5 -> { <<"prohibiting">>, 5}
end.
assert_equal(Expected, Actual) ->
case Expected -- Actual of
[] -> ok;
Diff -> lager:info("Expected -- Actual: ~p", [Diff])
end,
Actual == Expected.
pb_pid_recycler(undefined, Node) ->
rt:pbc(Node);
pb_pid_recycler(Pid, Node) ->
case riakc_pb_socket:is_connected(Pid) of
true ->
Pid;
_ ->
rt:pbc(Node)
end.