Merge pull request #543 from basho/refactor/one-wait-4-aae-trees

Merge repl and rt versions of wait until AAE trees build
This commit is contained in:
Engel A. Sanchez 2014-05-12 15:54:36 -04:00
commit bda1b5c3cf
5 changed files with 72 additions and 71 deletions

View File

@ -828,61 +828,77 @@ wait_until_nodes_agree_about_ownership(Nodes) ->
%% AAE support
wait_until_aae_trees_built(Nodes) ->
lager:info("Wait until AAE builds all partition trees across ~p", [Nodes]),
%% Wait until all nodes report no undefined trees
wait_until(fun() ->
lists:foldl(aae_tree_built_fun(), true, Nodes)
end).
BuiltFun = fun() -> lists:foldl(aae_tree_built_fun(), true, Nodes) end,
?assertEqual(ok, wait_until(BuiltFun)),
ok.
aae_tree_built_fun() ->
fun(_, false) ->
false;
(Node, _AllBuilt = true) ->
InfoRes = get_aae_tree_info(Node),
case all_trees_have_build_times(InfoRes) of
true ->
{ok, Info} = InfoRes,
Partitions = [I || {I, _} <- Info],
lager:debug("Check if really built by locking"),
fun(Node, _AllBuilt = true) ->
case get_aae_tree_info(Node) of
{ok, TreeInfos} ->
case all_trees_have_build_times(TreeInfos) of
true ->
Partitions = [I || {I, _} <- TreeInfos],
all_aae_trees_built(Node, Partitions);
false ->
some_trees_not_built
end;
Err ->
Err
end;
(_Node, Err) ->
Err
end.
%% Try to lock each partition. If you get not_built,
%% the manager has not detected the built process has
%% died yet.
%% Notice that the process locking is spawned by the
%% pmap. That's important! as it should die eventually
%% so the test can lock on the tree.
AllBuilt = lists:all(fun(V) -> V == true end,
rt:pmap(index_built_fun(Node), Partitions)),
lager:debug("For node ~p all built = ~p", [Node, AllBuilt]),
AllBuilt;
false ->
false
end
% It is unlikely but possible to get a tree built time from compute_tree_info
% but an attempt to use the tree returns not_built. This is because the build
% process has finished, but the lock on the tree won't be released until it
% dies and the manager detects it. Yes, this is super freaking paranoid.
all_aae_trees_built(Node, Partitions) ->
%% Notice that the process locking is spawned by the
%% pmap. That's important! as it should die eventually
%% so the lock is released and the test can lock the tree.
IndexBuilts = rt:pmap(index_built_fun(Node), Partitions),
BadOnes = [R || R <- IndexBuilts, R /= true],
case BadOnes of
[] ->
true;
_ ->
BadOnes
end.
get_aae_tree_info(Node) ->
case rpc:call(Node, riak_kv_entropy_info, compute_tree_info, []) of
{badrpc, _} ->
{error, badrpc};
{error, {badrpc, Node}};
Info ->
lager:debug("Entropy table on node ~p : ~p", [Node, Info]),
{ok, Info}
end.
all_trees_have_build_times({badrpc, _}) ->
false;
all_trees_have_build_times({ok, Info}) ->
all_trees_have_build_times(Info) ->
not lists:keymember(undefined, 2, Info).
index_built_fun(Node) ->
fun(Idx) ->
{ok, TreePid} = rpc:call(Node, riak_kv_vnode,
hashtree_pid, [Idx]),
TreeLocked =
rpc:call(Node, riak_kv_index_hashtree, get_lock,
[TreePid, for_riak_test]),
lager:debug("Partition ~p : ~p", [Idx, TreeLocked]),
TreeLocked == ok
orelse TreeLocked == already_locked
case rpc:call(Node, riak_kv_vnode,
hashtree_pid, [Idx]) of
{ok, TreePid} ->
case rpc:call(Node, riak_kv_index_hashtree,
get_lock, [TreePid, for_riak_test]) of
{badrpc, _} ->
{error, {badrpc, Node}};
TreeLocked when TreeLocked == ok;
TreeLocked == already_locked ->
true;
Err ->
% Either not_built or some unhandled result,
% in which case update this case please!
{error, {index_not_built, Node, Idx, Err}}
end;
{badrpc, _} ->
{error, {badrpc, Node}}
end
end.
%%%===================================================================

View File

@ -91,8 +91,8 @@ simple_test() ->
read_from_cluster(BFirst, 1, ?NUM_KEYS, ?NUM_KEYS),
%% Wait for trees to compute.
repl_util:wait_until_aae_trees_built(ANodes),
repl_util:wait_until_aae_trees_built(BNodes),
rt:wait_until_aae_trees_built(ANodes),
rt:wait_until_aae_trees_built(BNodes),
lager:info("Test fullsync from cluster A leader ~p to cluster B",
[LeaderA]),
@ -186,9 +186,9 @@ dual_test() ->
rt:wait_until_ring_converged(ANodes),
%% Wait for trees to compute.
repl_util:wait_until_aae_trees_built(ANodes),
repl_util:wait_until_aae_trees_built(BNodes),
repl_util:wait_until_aae_trees_built(CNodes),
rt:wait_until_aae_trees_built(ANodes),
rt:wait_until_aae_trees_built(BNodes),
rt:wait_until_aae_trees_built(CNodes),
%% Flush AAE trees to disk.
perform_sacrifice(AFirst),
@ -278,7 +278,7 @@ bidirectional_test() ->
perform_sacrifice(AFirst),
%% Wait for trees to compute.
repl_util:wait_until_aae_trees_built(ANodes),
rt:wait_until_aae_trees_built(ANodes),
%% Verify A replicated to B.
validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS),
@ -291,7 +291,7 @@ bidirectional_test() ->
perform_sacrifice(BFirst),
%% Wait for trees to compute.
repl_util:wait_until_aae_trees_built(BNodes),
rt:wait_until_aae_trees_built(BNodes),
%% Verify B replicated to A.
validate_completed_fullsync(LeaderB, AFirst, "A", ?NUM_KEYS + 1, ?NUM_KEYS + ?NUM_KEYS),
@ -350,8 +350,8 @@ difference_test() ->
[{timeout, 4000}]),
%% Wait for trees to compute.
repl_util:wait_until_aae_trees_built(ANodes),
repl_util:wait_until_aae_trees_built(BNodes),
rt:wait_until_aae_trees_built(ANodes),
rt:wait_until_aae_trees_built(BNodes),
lager:info("Test fullsync from cluster A leader ~p to cluster B",
[LeaderA]),
@ -436,8 +436,8 @@ deadlock_test() ->
[ok = rt_intercept:add(Target, Intercept) || Target <- ANodes],
%% Wait for trees to compute.
repl_util:wait_until_aae_trees_built(ANodes),
repl_util:wait_until_aae_trees_built(BNodes),
rt:wait_until_aae_trees_built(ANodes),
rt:wait_until_aae_trees_built(BNodes),
lager:info("Test fullsync from cluster A leader ~p to cluster B",
[LeaderA]),
@ -579,7 +579,7 @@ validate_intercepted_fullsync(InterceptTarget,
rt:wait_for_service(InterceptTarget, riak_repl),
%% Wait until AAE trees are compueted on the rebooted node.
repl_util:wait_until_aae_trees_built([InterceptTarget]).
rt:wait_until_aae_trees_built([InterceptTarget]).
%% @doc Given a node, find the port that the cluster manager is
%% listening on.

View File

@ -77,8 +77,8 @@ prepare_cluster_data(TestBucket, NumKeysAOnly, _NumKeysBoth, [AFirst|_] = ANodes
?assertEqual(NumKeysAOnly, length(Res2)),
%% wait for the AAE trees to be built so that we don't get a not_built error
repl_util:wait_until_aae_trees_built(ANodes),
repl_util:wait_until_aae_trees_built(BNodes),
rt:wait_until_aae_trees_built(ANodes),
rt:wait_until_aae_trees_built(BNodes),
ok.

View File

@ -122,27 +122,27 @@ fullsync_test(Strategy, Latency) ->
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
%% Perform fullsync of an empty cluster.
repl_util:wait_until_aae_trees_built(ANodes ++ BNodes),
rt:wait_until_aae_trees_built(ANodes ++ BNodes),
{EmptyTime, _} = timer:tc(repl_util,
start_and_wait_until_fullsync_complete,
[LeaderA]),
%% Write keys and perform fullsync.
repl_util:write_to_cluster(AFirst, 0, ?FULL_NUM_KEYS, ?TEST_BUCKET),
repl_util:wait_until_aae_trees_built(ANodes ++ BNodes),
rt:wait_until_aae_trees_built(ANodes ++ BNodes),
{FullTime, _} = timer:tc(repl_util,
start_and_wait_until_fullsync_complete,
[LeaderA]),
%% Rewrite first 10% keys and perform fullsync.
repl_util:write_to_cluster(AFirst, 0, ?DIFF_NUM_KEYS, ?TEST_BUCKET),
repl_util:wait_until_aae_trees_built(ANodes ++ BNodes),
rt:wait_until_aae_trees_built(ANodes ++ BNodes),
{DiffTime, _} = timer:tc(repl_util,
start_and_wait_until_fullsync_complete,
[LeaderA]),
%% Write no keys, and perform the fullsync.
repl_util:wait_until_aae_trees_built(ANodes ++ BNodes),
rt:wait_until_aae_trees_built(ANodes ++ BNodes),
{NoneTime, _} = timer:tc(repl_util,
start_and_wait_until_fullsync_complete,
[LeaderA]),

View File

@ -12,7 +12,6 @@
wait_until_leader_converge/1,
wait_until_connection/1,
wait_until_no_connection/1,
wait_until_aae_trees_built/1,
wait_for_reads/5,
start_and_wait_until_fullsync_complete/1,
start_and_wait_until_fullsync_complete/2,
@ -329,20 +328,6 @@ nodes_with_version(Nodes, Version) ->
nodes_all_have_version(Nodes, Version) ->
Nodes == nodes_with_version(Nodes, Version).
%% AAE support
wait_until_aae_trees_built(Cluster) ->
lager:info("Check if all trees built for nodes ~p", [Cluster]),
F = fun(Node) ->
Info = rpc:call(Node,
riak_kv_entropy_info,
compute_tree_info,
[]),
NotBuilt = [X || {_,undefined}=X <- Info],
NotBuilt == []
end,
[rt:wait_until(Node, F) || Node <- Cluster],
ok.
%% Return the number of partitions in the cluster where Node is a member.
num_partitions(Node) ->
{ok, Ring} = rpc:call(Node, riak_core_ring_manager, get_raw_ring, []),