diff --git a/tests/repl_aae_fullsync.erl b/tests/repl_aae_fullsync.erl index f3bd0470..3337016a 100644 --- a/tests/repl_aae_fullsync.erl +++ b/tests/repl_aae_fullsync.erl @@ -14,7 +14,7 @@ -define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>). -define(NUM_KEYS, 1000). --define(CONF, [ +-define(CONF(Retries), [ {riak_core, [ {ring_creation_size, 8}, @@ -34,7 +34,7 @@ {fullsync_strategy, aae}, {fullsync_on_connect, false}, {fullsync_interval, disabled}, - {max_fssource_retries, 5} + {max_fssource_retries, Retries} ]} ]). @@ -45,7 +45,7 @@ confirm() -> simple_test() -> %% Deploy 6 nodes. - Nodes = deploy_nodes(6, ?CONF), + Nodes = deploy_nodes(6, ?CONF(5)), %% Break up the 6 nodes into three clustes. {ANodes, BNodes} = lists:split(3, Nodes), @@ -107,11 +107,13 @@ simple_test() -> %% intercepts are removed. validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS), + rt:clean_cluster(Nodes), + pass. exhaustive_test() -> %% Deploy 6 nodes. - Nodes = deploy_nodes(6, ?CONF), + Nodes = deploy_nodes(6, ?CONF(infinity)), %% Break up the 6 nodes into three clustes. {ANodes, Rest} = lists:split(2, Nodes), @@ -172,24 +174,42 @@ exhaustive_test() -> repl_util:enable_fullsync(LeaderA, "C"), rt:wait_until_ring_converged(ANodes), + %% Wait for trees to compute. + repl_util:wait_until_aae_trees_built(ANodes), + repl_util:wait_until_aae_trees_built(BNodes), + repl_util:wait_until_aae_trees_built(CNodes), + %% Flush AAE trees to disk. perform_sacrifice(AFirst), - %% Verify data is replicated from A -> B successfully once the - %% intercepts are removed. + %% Verify data is replicated from A -> B successfully validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS), - %% Verify data is replicated from A -> B successfully once the - %% intercepts are removed. + %% Verify data is replicated from A -> C successfully validate_completed_fullsync(LeaderA, CFirst, "C", 1, ?NUM_KEYS), + + write_to_cluster(AFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS), + read_from_cluster(BFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, ?NUM_KEYS), + read_from_cluster(CFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, ?NUM_KEYS), + + + %% Verify that duelling fullsyncs eventually complete + {Time, _} = timer:tc(repl_util, + start_and_wait_until_fullsync_complete, + [LeaderA]), + + read_from_cluster(BFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, 0), + read_from_cluster(CFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, 0), + lager:info("Fullsync A->B and A->C completed in ~p seconds", [Time/1000/1000]), + pass. %% @doc Required for 1.4+ Riak, write sacrificial keys to force AAE %% trees to flush to disk. perform_sacrifice(Node) -> ?assertEqual([], repl_util:do_write(Node, 1, 2000, - <<"scarificial">>, 1)). + <<"sacrificial">>, 1)). %% @doc Validate fullsync completed and all keys are available. validate_completed_fullsync(ReplicationLeader, @@ -198,8 +218,9 @@ validate_completed_fullsync(ReplicationLeader, Start, End) -> ok = check_fullsync(ReplicationLeader, DestinationCluster, 0), - lager:info("Verify: Reading ~p keys repl'd from A(~p) to B(~p)", - [?NUM_KEYS, ReplicationLeader, DestinationNode]), + lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)", + [?NUM_KEYS, ReplicationLeader, + DestinationCluster, DestinationNode]), ?assertEqual(0, repl_util:wait_for_reads(DestinationNode, Start, @@ -212,7 +233,7 @@ validate_completed_fullsync(ReplicationLeader, check_fullsync(Node, Cluster, ExpectedFailures) -> {Time, _} = timer:tc(repl_util, start_and_wait_until_fullsync_complete, - [Node]), + [Node, Cluster]), lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]), Status = rpc:call(Node, riak_repl_console, status, [quiet]), @@ -334,5 +355,5 @@ write_to_cluster(Node, Start, End) -> %% of errors. read_from_cluster(Node, Start, End, Errors) -> lager:info("Reading ~p keys from node ~p.", [End - Start, Node]), - Res2 = rt:systest_read(Node, Start, ?NUM_KEYS, ?TEST_BUCKET, 1), + Res2 = rt:systest_read(Node, Start, End, ?TEST_BUCKET, 1), ?assertEqual(Errors, length(Res2)). diff --git a/tests/repl_util.erl b/tests/repl_util.erl index 8240c2cf..3501c035 100644 --- a/tests/repl_util.erl +++ b/tests/repl_util.erl @@ -15,6 +15,7 @@ wait_until_aae_trees_built/1, wait_for_reads/5, start_and_wait_until_fullsync_complete/1, + start_and_wait_until_fullsync_complete/2, connect_cluster/3, disconnect_cluster/2, wait_for_connection/2, @@ -152,13 +153,30 @@ get_fs_coord_status_item(Node, SinkName, ItemName) -> proplists:get_value(ItemName, ClusterProps). start_and_wait_until_fullsync_complete(Node) -> + start_and_wait_until_fullsync_complete(Node, undefined). + +start_and_wait_until_fullsync_complete(Node, Cluster) -> Status0 = rpc:call(Node, riak_repl_console, status, [quiet]), - Count = proplists:get_value(server_fullsyncs, Status0) + 1, + Count0 = proplists:get_value(server_fullsyncs, Status0), + Count = case Cluster of + undefined -> + %% count the # of fullsync enabled clusters + Count0 + length(string:tokens(proplists:get_value(fullsync_enabled, + Status0), ", ")); + _ -> + Count0 + 1 + end, lager:info("waiting for fullsync count to be ~p", [Count]), lager:info("Starting fullsync on ~p (~p)", [Node, rtdev:node_version(rtdev:node_id(Node))]), - rpc:call(Node, riak_repl_console, fullsync, [["start"]]), + Args = case Cluster of + undefined -> + ["start"]; + _ -> + ["start", Cluster] + end, + rpc:call(Node, riak_repl_console, fullsync, [Args]), %% sleep because of the old bug where stats will crash if you call it too %% soon after starting a fullsync timer:sleep(500),