Fix search tester in loaded_upgrade

Use a synchronous command to populate the search buckets, rather than
an asynchronous command. Otherwise, there is a data race between data
being rewritten and search queries being performed.

Change the search verify script to have the proper solr path, otherwise
the script always fails because it queries the wrong index.

Modify the search tester to allow search failures while the cluster has
mixed versions. There is currently a known bug in Riak that causes search
queries to potentially fail against a mixed cluster. The test now only
requires the tester to succeed after all nodes have been upgraded.
This commit is contained in:
Joseph Blomstedt 2012-08-08 12:54:53 -07:00
parent f86020259c
commit 10456236b7
2 changed files with 46 additions and 18 deletions

View File

@ -4,7 +4,7 @@
{rt_harness, rtdev}.
{rtdev_path, [{root, "/tmp/rt"},
{current, "/tmp/rt/current"},
{"1.1.2", "/tmp/rt/riak-1.1.2"},
{"1.1.4", "/tmp/rt/riak-1.1.4"},
{"1.0.3", "/tmp/rt/riak-1.0.3"},
{"0.14.2", "/tmp/rt/riak-0.14.2"}]}.

View File

@ -5,16 +5,17 @@
loaded_upgrade() ->
_ = rt:get_os_env("BASHO_BENCH"),
%% OldVsns = ["1.0.3", "1.1.2"],
OldVsns = ["1.1.2"],
%% OldVsns = ["1.0.3", "1.1.4"],
OldVsns = ["1.1.4"],
[verify_upgrade(OldVsn) || OldVsn <- OldVsns],
lager:info("Test ~p passed", [?MODULE]),
ok.
verify_upgrade(OldVsn) ->
Config = [{riak_search, [{enabled, true}]}],
NumNodes = 4,
Vsns = [OldVsn || _ <- lists:seq(2,NumNodes)],
Nodes = rt:build_cluster([current | Vsns]),
Vsns = [{OldVsn, Config} || _ <- lists:seq(2,NumNodes)],
Nodes = rt:build_cluster([{current, Config} | Vsns]),
[Node1|OldNodes] = Nodes,
lager:info("Writing 100 keys to ~p", [Node1]),
rt:systest_write(Node1, 100, 3),
@ -35,11 +36,13 @@ verify_upgrade(OldVsn) ->
rtdev:upgrade(Node, current),
_KV3 = check_kv_tester(KV2),
_MR3 = check_mapred_tester(MR2),
_Search3 = check_search_tester(Search2),
_Search3 = check_search_tester(Search2, false),
lager:info("Ensuring keys still exist"),
rt:wait_for_cluster_service(Nodes, riak_kv),
?assertEqual([], rt:systest_read(Node1, 100, 1))
end || Node <- OldNodes],
lager:info("Upgrade complete, ensure search now passes"),
check_search_tester(spawn_search_tester(Search1), true),
ok.
%% ===================================================================
@ -184,38 +187,62 @@ init_search_tester(Nodes, Conns) ->
rt:enable_search_hook(hd(Nodes), ?SPAM_BUCKET),
generate_search_scripts(Buckets, IPs, SpamDir),
[search_populate(Bucket) || Bucket <- Buckets],
%% Check search queries actually work as expected
[check_search(Bucket, Nodes) || Bucket <- Buckets],
#search{buckets=Buckets, runs=[]}.
check_search(?SPAM_BUCKET, Nodes) ->
SearchResults = [{"postoffice.mr.net", 194},
{"ZiaSun", 1},
{"headaches", 4},
{"YALSP", 3},
{"mister", 0},
{"prohibiting", 5}],
Results = [{Term,Count} || {Term, Count} <- SearchResults,
Node <- Nodes,
{Count2,_} <- [rpc:call(Node, search, search, [?SPAM_BUCKET, Term])],
Count2 == Count],
Expected = lists:usort(SearchResults),
Actual = lists:usort(Results),
?assertEqual(Expected, Actual),
ok.
spawn_search_tester(Search=#search{buckets=Buckets}) ->
Count = 3,
Runs = [{Bucket, search_spawn_verify(Bucket)} || Bucket <- Buckets,
_ <- lists:seq(1, Count)],
Search#search{runs=Runs}.
check_search_tester(Search=#search{runs=Runs}) ->
check_search_tester(Search=#search{runs=Runs}, Retest) ->
Failed = [Bucket || {Bucket, Run} <- Runs,
ok /= search_check_verify(Bucket, Run, [])],
[begin
lager:info("Failed search test for: ~p", [Bucket]),
lager:info("Re-running until test passes to check for data loss"),
Result =
rt:wait_until(node(),
fun(_) ->
Rerun = search_spawn_verify(Bucket),
ok == search_check_verify(Bucket, Rerun, [])
end),
?assertEqual(ok, Result),
lager:info("search test finally passed"),
maybe_retest_search(Retest, Bucket),
ok
end || Bucket <- Failed],
Search#search{runs=[]}.
maybe_retest_search(false, _) ->
ok;
maybe_retest_search(true, Bucket) ->
lager:info("Re-running until test passes to check for data loss"),
Result =
rt:wait_until(node(),
fun(_) ->
Rerun = search_spawn_verify(Bucket),
ok == search_check_verify(Bucket, Rerun, [])
end),
?assertEqual(ok, Result),
lager:info("search test finally passed"),
ok.
search_populate(Bucket) when is_binary(Bucket) ->
search_populate(binary_to_list(Bucket));
search_populate(Bucket) ->
Config = "bb-populate-" ++ Bucket ++ ".config",
lager:info("Populating search bucket: ~s", [Bucket]),
rt:spawn_cmd("$BASHO_BENCH/basho_bench " ++ Config).
rt:cmd("$BASHO_BENCH/basho_bench " ++ Config).
search_spawn_verify(Bucket) when is_binary(Bucket) ->
search_spawn_verify(binary_to_list(Bucket));
@ -297,7 +324,7 @@ kv_repair_script(Bucket, Host, Port) ->
{http_raw_port, Port},
{http_raw_path, "/riak/" ++ Bucket}],
Config = "bb-repair-" ++ Bucket ++ ".config",
file:write_file(Config, Cfg),
write_terms(Config, Cfg),
ok.
%% ===================================================================
@ -384,6 +411,7 @@ search_verify_script(Bucket, IPs) ->
{driver, basho_bench_driver_http_raw},
{operations, Operations},
{http_raw_ips, IPs},
{http_solr_path, "/solr/" ++ Bucket},
{http_raw_path, "/riak/" ++ Bucket},
{shutdown_on_error, true}],