Merge pull request #519 from basho/bugfix/bring-back-rt-wait-until-aae-trees-built

Bring test back to 1.4.7 glory
This commit is contained in:
Engel A. Sanchez 2014-02-20 19:48:48 -05:00
commit 785794cd6c
2 changed files with 97 additions and 7 deletions

View File

@ -126,6 +126,7 @@
wait_until/3,
wait_until/2,
wait_until/1,
wait_until_aae_trees_built/1,
wait_until_all_members/1,
wait_until_all_members/2,
wait_until_capability/3,
@ -758,6 +759,56 @@ wait_until_nodes_agree_about_ownership(Nodes) ->
Results = [ wait_until_owners_according_to(Node, Nodes) || Node <- Nodes ],
?assert(lists:all(fun(X) -> ok =:= X end, Results)).
%% AAE support
wait_until_aae_trees_built(Nodes) ->
lager:info("Wait until AAE builds all partition trees across ~p", [Nodes]),
%% Wait until all nodes report no undefined trees
AllBuiltFun =
fun(_, _AllBuilt = false) ->
false;
(Node, _AllBuilt = true) ->
Info = rpc:call(Node,
riak_kv_entropy_info,
compute_tree_info,
[]),
lager:debug("Entropy table on node ~p : ~p", [Node, Info]),
AllHaveBuildTimes = not lists:keymember(undefined, 2, Info),
case AllHaveBuildTimes of
false ->
false;
true ->
lager:debug("Check if really built by locking"),
%% Try to lock each partition. If you get not_built,
%% the manager has not detected the built process has
%% died yet.
%% Notice that the process locking is spawned by the
%% pmap. That's important! as it should die eventually
%% so the test can lock on the tree.
IdxBuilt =
fun(Idx) ->
{ok, TreePid} = rpc:call(Node, riak_kv_vnode,
hashtree_pid, [Idx]),
TreeLocked =
rpc:call(Node, riak_kv_index_hashtree, get_lock,
[TreePid, for_riak_test]),
lager:debug("Partition ~p : ~p", [Idx, TreeLocked]),
TreeLocked == ok
orelse TreeLocked == already_locked
end,
Partitions = [I || {I, _} <- Info],
AllBuilt =
lists:all(fun(V) -> V == true end,
rt:pmap(IdxBuilt, Partitions)),
lager:debug("For node ~p all built = ~p", [Node, AllBuilt]),
AllBuilt
end
end,
wait_until(fun() ->
lists:foldl(AllBuiltFun, true, Nodes)
end).
%%%===================================================================
%%% Ring Functions
%%%===================================================================

View File

@ -31,22 +31,21 @@
-define(N_VAL, 3).
confirm() ->
Nodes =
[Node1] = rt:build_cluster(1,
[{riak_kv,
[{anti_entropy_build_limit, {100, 1000}},
[{anti_entropy, {off, []}},
{anti_entropy_build_limit, {100, 500}},
{anti_entropy_concurrency, 100},
{anti_entropy_tick, 1000}]}]),
rt:wait_until_aae_trees_built(Nodes),
{anti_entropy_tick, 200}]}]),
rt_intercept:load_code(Node1),
rt_intercept:add(Node1,
{riak_object,
[{{index_specs, 1}, skippable_index_specs},
{{diff_index_specs, 2}, skippable_diff_index_specs}]}),
lager:info("Installed intercepts to corrupt index specs on node ~p", [Node1]),
%%rpc:call(Node1, lager, set_loglevel, [lager_console_backend, debug]),
PBC = rt:pbc(Node1),
NumItems = ?NUM_ITEMS,
%%NumDelItems = NumItems div 10,
NumDel = ?NUM_DELETES,
pass = check_lost_objects(Node1, PBC, NumItems, NumDel),
pass = check_lost_indexes(Node1, PBC, NumItems),
@ -61,13 +60,25 @@ check_lost_objects(Node1, PBC, NumItems, NumDel) ->
Index = {integer_index, "i"},
set_skip_index_specs(Node1, false),
lager:info("Putting ~p objects with indexes", [NumItems]),
[put_obj(PBC, Bucket, N, N+1, Index) || N <- lists:seq(1, NumItems),
HalfNumItems = NumItems div 2,
[put_obj(PBC, Bucket, N, N+1, Index) || N <- lists:seq(1, HalfNumItems),
Bucket <- ?BUCKETS],
lager:info("Put half the objects, now enable AAE and build tress"),
%% Enable AAE and build trees.
ok = rpc:call(Node1, application, set_env,
[riak_kv, anti_entropy, {on, [debug]}]),
ok = rpc:call(Node1, riak_kv_entropy_manager, enable, []),
rt:wait_until_aae_trees_built([Node1]),
lager:info("AAE trees built, now put the rest of the data"),
[put_obj(PBC, Bucket, N, N+1, Index)
|| N <- lists:seq(HalfNumItems+1, NumItems), Bucket <- ?BUCKETS],
%% Verify they are there.
ExpectedInitial = [{to_key(N+1), to_key(N)} || N <- lists:seq(1, NumItems)],
lager:info("Check objects are there as expected"),
[assert_range_query(PBC, Bucket, ExpectedInitial, Index, 1, NumItems+1)
|| Bucket <- ?BUCKETS],
lager:info("Now mess index spec code and change values"),
set_skip_index_specs(Node1, true),
[put_obj(PBC, Bucket, N, N, Index) || N <- lists:seq(1, NumItems-NumDel),
@ -75,7 +86,7 @@ check_lost_objects(Node1, PBC, NumItems, NumDel) ->
DelRange = lists:seq(NumItems-NumDel+1, NumItems),
lager:info("Deleting ~b objects without updating indexes", [NumDel]),
[del_obj(PBC, Bucket, N) || N <- DelRange, Bucket <- ?BUCKETS],
DelKeys = [to_key(N) || N <- DelRange],
DelKeys = [to_key(N) || N <- DelRange],
[rt:wait_until(fun() -> rt:pbc_really_deleted(PBC, Bucket, DelKeys) end)
|| Bucket <- ?BUCKETS],
%% Verify they are damaged
@ -90,6 +101,33 @@ check_lost_objects(Node1, PBC, NumItems, NumDel) ->
|| Bucket <- ?BUCKETS],
pass.
do_tree_rebuild(Node) ->
lager:info("Let's go through a tree rebuild right here"),
%% Cheat by clearing build times from ETS directly, as the code doesn't
%% ever clear them currently.
?assertEqual(true, rpc:call(Node, ets, delete_all_objects, [ets_riak_kv_entropy])),
%% Make it so it doesn't go wild rebuilding things when the expiration is
%% tiny.
?assertEqual(ok, rpc:call(Node, application, set_env, [riak_kv,
anti_entropy_build_limit,
{0, 5000}])),
%% Make any tree expire on tick.
?assertEqual(ok, rpc:call(Node, application, set_env, [riak_kv,
anti_entropy_expire,
1])),
%% Wait for a good number of ticks.
timer:sleep(5000),
%% Make sure things stop expiring on tick
?assertEqual(ok, rpc:call(Node, application, set_env, [riak_kv,
anti_entropy_expire,
7 * 24 * 60 * 60 * 1000])),
%% And let the manager start allowing builds again.
?assertEqual(ok, rpc:call(Node, application, set_env, [riak_kv,
anti_entropy_build_limit,
{100, 1000}])),
rt:wait_until_aae_trees_built([Node]),
ok.
%% Write objects without a 2i index. Test that running 2i repair will generate
%% the missing indexes.
check_lost_indexes(Node1, PBC, NumItems) ->
@ -101,6 +139,7 @@ check_lost_indexes(Node1, PBC, NumItems) ->
lager:info("Verify that objects cannot be found via index"),
[assert_range_query(PBC, Bucket, [], Index, 1, NumItems+1)
|| Bucket <- ?BUCKETS],
do_tree_rebuild(Node1),
run_2i_repair(Node1),
lager:info("Check that objects can now be found via index"),
Expected = [{to_key(N+1), to_key(N)} || N <- lists:seq(1, NumItems)],