From 1a61e066349c105161ee69594dda07ea5e7b8ee6 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Fri, 25 Jan 2013 10:12:34 -0800 Subject: [PATCH 1/2] Moved common repl2 functions to repl_util and added log_to_nodes tracing to replication2_dirty test --- tests/repl_util.erl | 197 +++++++++++++++++++++-- tests/replication2.erl | 299 +++++++---------------------------- tests/replication2_dirty.erl | 51 +++--- 3 files changed, 276 insertions(+), 271 deletions(-) diff --git a/tests/repl_util.erl b/tests/repl_util.erl index b920df5b..9e9323a2 100644 --- a/tests/repl_util.erl +++ b/tests/repl_util.erl @@ -1,15 +1,194 @@ -module(repl_util). --export([make_cluster/1]). +-export([make_cluster/1, name_cluster/2]). -compile(export_all). -include_lib("eunit/include/eunit.hrl"). --import(rt, [deploy_nodes/2, - join/2, - wait_until_nodes_ready/1, - wait_until_no_pending_changes/1]). - make_cluster(Nodes) -> [First|Rest] = Nodes, - [join(Node, First) || Node <- Rest], - ?assertEqual(ok, wait_until_nodes_ready(Nodes)), - ?assertEqual(ok, wait_until_no_pending_changes(Nodes)). + [rt:join(Node, First) || Node <- Rest], + ?assertEqual(ok, rt:wait_until_nodes_ready(Nodes)), + ?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes)). + +name_cluster(Node, Name) -> + lager:info("Naming cluster ~p",[Name]), + Res = rpc:call(Node, riak_repl_console, clustername, [[Name]]), + ?assertEqual(ok, Res). + +wait_until_is_leader(Node) -> + lager:info("wait_until_is_leader(~p)", [Node]), + rt:wait_until(Node, fun is_leader/1). + +is_leader(Node) -> + case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of + {badrpc, _} -> + lager:info("Badrpc"), + false; + Leader -> + lager:info("Checking: ~p =:= ~p", [Leader, Node]), + Leader =:= Node + end. + + +wait_until_is_not_leader(Node) -> + lager:info("wait_until_is_not_leader(~p)", [Node]), + rt:wait_until(Node, fun is_not_leader/1). + +is_not_leader(Node) -> + case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of + {badrpc, _} -> + lager:info("Badrpc"), + false; + Leader -> + lager:info("Checking: ~p =/= ~p", [Leader, Node]), + Leader =/= Node + end. + +wait_until_leader(Node) -> + wait_until_new_leader(Node, undefined). + +wait_until_new_leader(Node, OldLeader) -> + Res = rt:wait_until(Node, + fun(_) -> + Status = rpc:call(Node, riak_core_cluster_mgr, get_leader, []), + case Status of + {badrpc, _} -> + false; + undefined -> + false; + OldLeader -> + false; + _Other -> + true + end + end), + ?assertEqual(ok, Res). + +wait_until_leader_converge([Node|_] = Nodes) -> + rt:wait_until(Node, + fun(_) -> + length(lists:usort([begin + case rpc:call(N, riak_core_cluster_mgr, get_leader, []) of + undefined -> + false; + L -> + %lager:info("Leader for ~p is ~p", + %[N,L]), + L + end + end || N <- Nodes])) == 1 + end). + +wait_until_connection(Node) -> + rt:wait_until(Node, + fun(_) -> + Status = rpc:call(Node, riak_repl_console, status, [quiet]), + case proplists:get_value(fullsync_coordinator, Status) of + [] -> + false; + [_C] -> + true; + Conns -> + lager:warning("multiple connections detected: ~p", + [Conns]), + true + end + end). %% 40 seconds is enough for repl + +wait_until_no_connection(Node) -> + rt:wait_until(Node, + fun(_) -> + Status = rpc:call(Node, riak_repl_console, status, [quiet]), + case proplists:get_value(connected_clusters, Status) of + [] -> + true; + _ -> + false + end + end). %% 40 seconds is enough for repl + +wait_for_reads(Node, Start, End, Bucket, R) -> + rt:wait_until(Node, + fun(_) -> + rt:systest_read(Node, Start, End, Bucket, R) == [] + end), + Reads = rt:systest_read(Node, Start, End, Bucket, R), + lager:info("Reads: ~p", [Reads]), + length(Reads). + +start_and_wait_until_fullsync_complete(Node) -> + Status0 = rpc:call(Node, riak_repl_console, status, [quiet]), + Count = proplists:get_value(server_fullsyncs, Status0) + 1, + lager:info("waiting for fullsync count to be ~p", [Count]), + + lager:info("Starting fullsync on ~p (~p)", [Node, + rtdev:node_version(rtdev:node_id(Node))]), + rpc:call(Node, riak_repl_console, fullsync, [["start"]]), + %% sleep because of the old bug where stats will crash if you call it too + %% soon after starting a fullsync + timer:sleep(500), + + Res = rt:wait_until(Node, + fun(_) -> + Status = rpc:call(Node, riak_repl_console, status, [quiet]), + case proplists:get_value(server_fullsyncs, Status) of + C when C >= Count -> + true; + _ -> + false + end + end), + ?assertEqual(ok, Res), + + lager:info("Fullsync on ~p complete", [Node]). + +connect_cluster(Node, IP, Port) -> + Res = rpc:call(Node, riak_repl_console, connect, + [[IP, integer_to_list(Port)]]), + ?assertEqual(ok, Res). + +disconnect_cluster(Node, Name) -> + Res = rpc:call(Node, riak_repl_console, disconnect, + [[Name]]), + ?assertEqual(ok, Res). + +wait_for_connection(Node, Name) -> + rt:wait_until(Node, + fun(_) -> + {ok, Connections} = rpc:call(Node, riak_core_cluster_mgr, + get_connections, []), + lists:any(fun({{cluster_by_name, N}, _}) when N == Name -> true; + (_) -> false + end, Connections) + end). + +enable_realtime(Node, Cluster) -> + Res = rpc:call(Node, riak_repl_console, realtime, [["enable", Cluster]]), + ?assertEqual(ok, Res). + +disable_realtime(Node, Cluster) -> + Res = rpc:call(Node, riak_repl_console, realtime, [["disable", Cluster]]), + ?assertEqual(ok, Res). + +enable_fullsync(Node, Cluster) -> + Res = rpc:call(Node, riak_repl_console, fullsync, [["enable", Cluster]]), + ?assertEqual(ok, Res). + +start_realtime(Node, Cluster) -> + Res = rpc:call(Node, riak_repl_console, realtime, [["start", Cluster]]), + ?assertEqual(ok, Res). + +stop_realtime(Node, Cluster) -> + Res = rpc:call(Node, riak_repl_console, realtime, [["stop", Cluster]]), + ?assertEqual(ok, Res). + +do_write(Node, Start, End, Bucket, W) -> + case rt:systest_write(Node, Start, End, Bucket, W) of + [] -> + []; + Errors -> + lager:warning("~p errors while writing: ~p", + [length(Errors), Errors]), + timer:sleep(1000), + lists:flatten([rt:systest_write(Node, S, S, Bucket, W) || + {S, _Error} <- Errors]) + end. diff --git a/tests/replication2.erl b/tests/replication2.erl index 141e1b6e..db826439 100644 --- a/tests/replication2.erl +++ b/tests/replication2.erl @@ -61,35 +61,35 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> %% write some initial data to A lager:info("Writing 100 keys to ~p", [AFirst]), - ?assertEqual([], do_write(AFirst, 1, 100, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(AFirst, 1, 100, TestBucket, 2)), - name_cluster(AFirst, "A"), - name_cluster(BFirst, "B"), + repl_util:name_cluster(AFirst, "A"), + repl_util:name_cluster(BFirst, "B"), rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(BNodes), %% TODO: we'll need to wait for cluster names before continuing %% get the leader for the first cluster - wait_until_leader(AFirst), + repl_util:wait_until_leader(AFirst), LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []), {ok, {_IP, Port}} = rpc:call(BFirst, application, get_env, [riak_core, cluster_mgr]), - connect_cluster(LeaderA, "127.0.0.1", Port), + repl_util:connect_cluster(LeaderA, "127.0.0.1", Port), - ?assertEqual(ok, wait_for_connection(LeaderA, "B")), - enable_realtime(LeaderA, "B"), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")), + repl_util:enable_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - start_realtime(LeaderA, "B"), + repl_util:start_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - enable_fullsync(LeaderA, "B"), + repl_util:enable_fullsync(LeaderA, "B"), rt:wait_until_ring_converged(ANodes); _ -> lager:info("waiting for leader to converge on cluster A"), - ?assertEqual(ok, wait_until_leader_converge(ANodes)), + ?assertEqual(ok, repl_util:wait_until_leader_converge(ANodes)), lager:info("waiting for leader to converge on cluster B"), - ?assertEqual(ok, wait_until_leader_converge(BNodes)), + ?assertEqual(ok, repl_util:wait_until_leader_converge(BNodes)), %% get the leader for the first cluster LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []), lager:info("Leader on cluster A is ~p", [LeaderA]), @@ -100,14 +100,14 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> log_to_nodes(AllNodes, "Write data to A, verify replication to B via realtime"), %% write some data on A - ?assertEqual(ok, wait_for_connection(LeaderA, "B")), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")), %io:format("~p~n", [rpc:call(LeaderA, riak_repl_console, status, [quiet])]), lager:info("Writing 100 more keys to ~p", [LeaderA]), - ?assertEqual([], do_write(LeaderA, 101, 200, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(LeaderA, 101, 200, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading 100 keys written to ~p from ~p", [LeaderA, BFirst]), - ?assertEqual(0, wait_for_reads(BFirst, 101, 200, TestBucket, 2)), + ?assertEqual(0, repl_util:wait_for_reads(BFirst, 101, 200, TestBucket, 2)), case Connected of false -> @@ -118,10 +118,10 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> ?assertEqual(100, length(Res2)), log_to_nodes(AllNodes, "Test fullsync with leader ~p", [LeaderA]), - start_and_wait_until_fullsync_complete(LeaderA), + repl_util:start_and_wait_until_fullsync_complete(LeaderA), lager:info("Check keys written before repl was connected are present"), - ?assertEqual(0, wait_for_reads(BFirst, 1, 200, TestBucket, 2)); + ?assertEqual(0, repl_util:wait_for_reads(BFirst, 1, 200, TestBucket, 2)); _ -> ok end, @@ -137,22 +137,22 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> rt:stop(LeaderA), rt:wait_until_unpingable(LeaderA), ASecond = hd(ANodes -- [LeaderA]), - wait_until_leader(ASecond), + repl_util:wait_until_leader(ASecond), LeaderA2 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []), lager:info("New leader is ~p", [LeaderA2]), - ?assertEqual(ok, wait_until_connection(LeaderA2)), + ?assertEqual(ok, repl_util:wait_until_connection(LeaderA2)), lager:info("Writing 100 more keys to ~p now that the old leader is down", [ASecond]), - ?assertEqual([], do_write(ASecond, 201, 300, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(ASecond, 201, 300, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading 100 keys written to ~p from ~p", [ASecond, BFirst]), - ?assertEqual(0, wait_for_reads(BFirst, 201, 300, TestBucket, 2)), + ?assertEqual(0, repl_util:wait_for_reads(BFirst, 201, 300, TestBucket, 2)), %% get the leader for the first cluster LeaderB = rpc:call(BFirst, riak_core_cluster_mgr, get_leader, []), @@ -163,28 +163,28 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> rt:stop(LeaderB), rt:wait_until_unpingable(LeaderB), BSecond = hd(BNodes -- [LeaderB]), - wait_until_leader(BSecond), + repl_util:wait_until_leader(BSecond), LeaderB2 = rpc:call(BSecond, riak_core_cluster_mgr, get_leader, []), lager:info("New leader is ~p", [LeaderB2]), - ?assertEqual(ok, wait_until_connection(LeaderA2)), + ?assertEqual(ok, repl_util:wait_until_connection(LeaderA2)), lager:info("Writing 100 more keys to ~p now that the old leader is down", [ASecond]), - ?assertEqual([], do_write(ASecond, 301, 400, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(ASecond, 301, 400, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading 101 keys written to ~p from ~p", [ASecond, BSecond]), - ?assertEqual(0, wait_for_reads(BSecond, 301, 400, TestBucket, 2)), + ?assertEqual(0, repl_util:wait_for_reads(BSecond, 301, 400, TestBucket, 2)), %% Testing fullsync with downed nodes log_to_nodes(AllNodes, "Test fullsync with ~p and ~p down", [LeaderA, LeaderB]), lager:info("Re-running fullsync with ~p and ~p down", [LeaderA, LeaderB]), - start_and_wait_until_fullsync_complete(LeaderA2), + repl_util:start_and_wait_until_fullsync_complete(LeaderA2), %% %% Per-bucket repl settings tests @@ -195,7 +195,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> lager:info("Restarting down node ~p", [LeaderA]), rt:start(LeaderA), rt:wait_until_pingable(LeaderA), - start_and_wait_until_fullsync_complete(LeaderA2), + repl_util:start_and_wait_until_fullsync_complete(LeaderA2), log_to_nodes(AllNodes, "Starting Joe's Repl Test"), lager:info("Starting Joe's Repl Test"), @@ -209,7 +209,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> lager:info("LeaderA: ~p", [LeaderA]), lager:info("LeaderA2: ~p", [LeaderA2]), - ?assertEqual(ok, wait_until_connection(LeaderA)), + ?assertEqual(ok, repl_util:wait_until_connection(LeaderA)), log_to_nodes(AllNodes, "Simulate partition to force leader re-election"), @@ -222,7 +222,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> [ rpc:call(LeaderA2, erlang, disconnect_node, [Node]) || Node <- ANodes -- [LeaderA2]], [ rpc:call(Node, erlang, disconnect_node, [LeaderA2]) || Node <- ANodes -- [LeaderA2]], - wait_until_new_leader(hd(ANodes -- [LeaderA2]), LeaderA2), + repl_util:wait_until_new_leader(hd(ANodes -- [LeaderA2]), LeaderA2), InterimLeader = rpc:call(LeaderA, riak_core_cluster_mgr, get_leader, []), lager:info("Interim leader: ~p", [InterimLeader]), @@ -234,15 +234,15 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> %% there's no point in writing anything until the leaders converge, as we %% can drop writes in the middle of an election - wait_until_leader_converge(ANodes), + repl_util:wait_until_leader_converge(ANodes), lager:info("Leader: ~p", [rpc:call(ASecond, riak_core_cluster_mgr, get_leader, [])]), lager:info("Writing 2 more keys to ~p", [LeaderA]), - ?assertEqual([], do_write(LeaderA, 1301, 1302, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(LeaderA, 1301, 1302, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading 2 keys written to ~p from ~p", [LeaderA, BSecond]), - ?assertEqual(0, wait_for_reads(BSecond, 1301, 1302, TestBucket, 2)), + ?assertEqual(0, repl_util:wait_for_reads(BSecond, 1301, 1302, TestBucket, 2)), log_to_nodes(AllNodes, "Finished Joe's Section"), lager:info("Finished Joe's Section"), @@ -260,37 +260,37 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> %% disconnect the other cluster, so realtime doesn't happen lager:info("disconnect the 2 clusters"), - disable_realtime(LeaderA, "B"), + repl_util:disable_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - disconnect_cluster(LeaderA, "B"), - wait_until_no_connection(LeaderA), + repl_util:disconnect_cluster(LeaderA, "B"), + repl_util:wait_until_no_connection(LeaderA), rt:wait_until_ring_converged(ANodes), lager:info("write 100 keys to a realtime only bucket"), - ?assertEqual([], do_write(ASecond, 1, 100, + ?assertEqual([], repl_util:do_write(ASecond, 1, 100, RealtimeOnly, 2)), lager:info("reconnect the 2 clusters"), - connect_cluster(LeaderA, "127.0.0.1", Port), - ?assertEqual(ok, wait_for_connection(LeaderA, "B")), + repl_util:connect_cluster(LeaderA, "127.0.0.1", Port), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")), rt:wait_until_ring_converged(ANodes), - enable_realtime(LeaderA, "B"), + repl_util:enable_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - enable_fullsync(LeaderA, "B"), + repl_util:enable_fullsync(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - start_realtime(LeaderA, "B"), + repl_util:start_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - ?assertEqual(ok, wait_until_connection(LeaderA)), + ?assertEqual(ok, repl_util:wait_until_connection(LeaderA)), LeaderA3 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []), log_to_nodes(AllNodes, "Test fullsync and realtime independence"), lager:info("write 100 keys to a {repl, false} bucket"), - ?assertEqual([], do_write(ASecond, 1, 100, NoRepl, 2)), + ?assertEqual([], repl_util:do_write(ASecond, 1, 100, NoRepl, 2)), lager:info("write 100 keys to a fullsync only bucket"), - ?assertEqual([], do_write(ASecond, 1, 100, + ?assertEqual([], repl_util:do_write(ASecond, 1, 100, FullsyncOnly, 2)), lager:info("Check the fullsync only bucket didn't replicate the writes"), @@ -308,10 +308,10 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> %% do a fullsync, make sure that fullsync_only is replicated, but %% realtime_only and no_repl aren't - start_and_wait_until_fullsync_complete(LeaderA3), + repl_util:start_and_wait_until_fullsync_complete(LeaderA3), lager:info("Check fullsync only bucket is now replicated"), - ?assertEqual(0, wait_for_reads(BSecond, 1, 100, + ?assertEqual(0, repl_util:wait_for_reads(BSecond, 1, 100, FullsyncOnly, 2)), lager:info("Check realtime only bucket didn't replicate"), @@ -321,13 +321,13 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> lager:info("Write 100 more keys into realtime only bucket on ~p", [ASecond]), - ?assertEqual([], do_write(ASecond, 101, 200, + ?assertEqual([], repl_util:do_write(ASecond, 101, 200, RealtimeOnly, 2)), timer:sleep(5000), lager:info("Check the realtime keys replicated"), - ?assertEqual(0, wait_for_reads(BSecond, 101, 200, + ?assertEqual(0, repl_util:wait_for_reads(BSecond, 101, 200, RealtimeOnly, 2)), lager:info("Check the older keys in the realtime bucket did not replicate"), @@ -344,20 +344,20 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> LeaderA4 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []), lager:info("Stopping realtime, queue will build"), - stop_realtime(LeaderA4, "B"), + repl_util:stop_realtime(LeaderA4, "B"), rt:wait_until_ring_converged(ANodes), lager:info("Writing 100 keys"), - ?assertEqual([], do_write(LeaderA4, 800, 900, + ?assertEqual([], repl_util:do_write(LeaderA4, 800, 900, TestBucket, 2)), lager:info("Starting realtime"), - start_realtime(LeaderA4, "B"), + repl_util:start_realtime(LeaderA4, "B"), rt:wait_until_ring_converged(ANodes), timer:sleep(3000), lager:info("Reading keys written while repl was stopped"), - ?assertEqual(0, wait_for_reads(BSecond, 800, 900, + ?assertEqual(0, repl_util:wait_for_reads(BSecond, 800, 900, TestBucket, 2)), log_to_nodes(AllNodes, "Testing realtime migration on node shutdown"), @@ -365,11 +365,11 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> Target = hd(ANodes -- [LeaderA4]), lager:info("Stopping realtime, queue will build"), - stop_realtime(LeaderA4, "B"), + repl_util:stop_realtime(LeaderA4, "B"), rt:wait_until_ring_converged(ANodes), lager:info("Writing 100 keys"), - ?assertEqual([], do_write(Target, 900, 1000, + ?assertEqual([], repl_util:do_write(Target, 900, 1000, TestBucket, 2)), io:format("queue status: ~p", @@ -381,11 +381,11 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) -> rt:wait_until_unpingable(Target), lager:info("Starting realtime"), - start_realtime(LeaderA4, "B"), + repl_util:start_realtime(LeaderA4, "B"), timer:sleep(3000), lager:info("Reading keys written while repl was stopped"), - ?assertEqual(0, wait_for_reads(BSecond, 900, 1000, + ?assertEqual(0, repl_util:wait_for_reads(BSecond, 900, 1000, TestBucket, 2)), lager:info("Restarting node ~p", [Target]), @@ -592,12 +592,6 @@ collect_results(Workers, Acc) -> collect_results(lists:keydelete(Pid, 1, Workers), Acc) end. -make_cluster(Nodes) -> - [First|Rest] = Nodes, - [join(Node, First) || Node <- Rest], - ?assertEqual(ok, wait_until_nodes_ready(Nodes)), - ?assertEqual(ok, wait_until_no_pending_changes(Nodes)). - %% does the node meet the version requirement? node_has_version(Node, Version) -> NodeVersion = rtdev:node_version(rtdev:node_id(Node)), @@ -619,188 +613,5 @@ client_count(Node) -> Clients = rpc:call(Node, supervisor, which_children, [riak_repl_client_sup]), length(Clients). -start_and_wait_until_fullsync_complete(Node) -> - Status0 = rpc:call(Node, riak_repl_console, status, [quiet]), - Count = proplists:get_value(server_fullsyncs, Status0) + 1, - lager:info("waiting for fullsync count to be ~p", [Count]), - - lager:info("Starting fullsync on ~p (~p)", [Node, - rtdev:node_version(rtdev:node_id(Node))]), - rpc:call(Node, riak_repl_console, fullsync, [["start"]]), - %% sleep because of the old bug where stats will crash if you call it too - %% soon after starting a fullsync - timer:sleep(500), - - Res = rt:wait_until(Node, - fun(_) -> - Status = rpc:call(Node, riak_repl_console, status, [quiet]), - case proplists:get_value(server_fullsyncs, Status) of - C when C >= Count -> - true; - _ -> - false - end - end), - ?assertEqual(ok, Res), - - lager:info("Fullsync on ~p complete", [Node]). - -wait_until_is_leader(Node) -> - lager:info("wait_until_is_leader(~p)", [Node]), - rt:wait_until(Node, fun is_leader/1). - -is_leader(Node) -> - case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of - {badrpc, _} -> - lager:info("Badrpc"), - false; - Leader -> - lager:info("Checking: ~p =:= ~p", [Leader, Node]), - Leader =:= Node - end. -wait_until_is_not_leader(Node) -> - lager:info("wait_until_is_not_leader(~p)", [Node]), - rt:wait_until(Node, fun is_not_leader/1). - -is_not_leader(Node) -> - case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of - {badrpc, _} -> - lager:info("Badrpc"), - false; - Leader -> - lager:info("Checking: ~p =/= ~p", [Leader, Node]), - Leader =/= Node - end. - -wait_until_leader(Node) -> - wait_until_new_leader(Node, undefined). - -wait_until_new_leader(Node, OldLeader) -> - Res = rt:wait_until(Node, - fun(_) -> - Status = rpc:call(Node, riak_core_cluster_mgr, get_leader, []), - case Status of - {badrpc, _} -> - false; - undefined -> - false; - OldLeader -> - false; - _Other -> - true - end - end), - ?assertEqual(ok, Res). - -wait_until_leader_converge([Node|_] = Nodes) -> - rt:wait_until(Node, - fun(_) -> - length(lists:usort([begin - case rpc:call(N, riak_core_cluster_mgr, get_leader, []) of - undefined -> - false; - L -> - %lager:info("Leader for ~p is ~p", - %[N,L]), - L - end - end || N <- Nodes])) == 1 - end). - -wait_until_connection(Node) -> - rt:wait_until(Node, - fun(_) -> - Status = rpc:call(Node, riak_repl_console, status, [quiet]), - case proplists:get_value(fullsync_coordinator, Status) of - [] -> - false; - [_C] -> - true; - Conns -> - lager:warning("multiple connections detected: ~p", - [Conns]), - true - end - end). %% 40 seconds is enough for repl - -wait_until_no_connection(Node) -> - rt:wait_until(Node, - fun(_) -> - Status = rpc:call(Node, riak_repl_console, status, [quiet]), - case proplists:get_value(connected_clusters, Status) of - [] -> - true; - _ -> - false - end - end). %% 40 seconds is enough for repl - - - -wait_for_reads(Node, Start, End, Bucket, R) -> - rt:wait_until(Node, - fun(_) -> - rt:systest_read(Node, Start, End, Bucket, R) == [] - end), - Reads = rt:systest_read(Node, Start, End, Bucket, R), - lager:info("Reads: ~p", [Reads]), - length(Reads). - -do_write(Node, Start, End, Bucket, W) -> - case rt:systest_write(Node, Start, End, Bucket, W) of - [] -> - []; - Errors -> - lager:warning("~p errors while writing: ~p", - [length(Errors), Errors]), - timer:sleep(1000), - lists:flatten([rt:systest_write(Node, S, S, Bucket, W) || - {S, _Error} <- Errors]) - end. - -name_cluster(Node, Name) -> - lager:info("Naming cluster ~p",[Name]), - Res = rpc:call(Node, riak_repl_console, clustername, [[Name]]), - ?assertEqual(ok, Res). - -connect_cluster(Node, IP, Port) -> - Res = rpc:call(Node, riak_repl_console, connect, - [[IP, integer_to_list(Port)]]), - ?assertEqual(ok, Res). - -disconnect_cluster(Node, Name) -> - Res = rpc:call(Node, riak_repl_console, disconnect, - [[Name]]), - ?assertEqual(ok, Res). - -wait_for_connection(Node, Name) -> - rt:wait_until(Node, - fun(_) -> - {ok, Connections} = rpc:call(Node, riak_core_cluster_mgr, - get_connections, []), - lists:any(fun({{cluster_by_name, N}, _}) when N == Name -> true; - (_) -> false - end, Connections) - end). - -enable_realtime(Node, Cluster) -> - Res = rpc:call(Node, riak_repl_console, realtime, [["enable", Cluster]]), - ?assertEqual(ok, Res). - -disable_realtime(Node, Cluster) -> - Res = rpc:call(Node, riak_repl_console, realtime, [["disable", Cluster]]), - ?assertEqual(ok, Res). - -enable_fullsync(Node, Cluster) -> - Res = rpc:call(Node, riak_repl_console, fullsync, [["enable", Cluster]]), - ?assertEqual(ok, Res). - -start_realtime(Node, Cluster) -> - Res = rpc:call(Node, riak_repl_console, realtime, [["start", Cluster]]), - ?assertEqual(ok, Res). - -stop_realtime(Node, Cluster) -> - Res = rpc:call(Node, riak_repl_console, realtime, [["stop", Cluster]]), - ?assertEqual(ok, Res). diff --git a/tests/replication2_dirty.erl b/tests/replication2_dirty.erl index f48eb765..2e3b6009 100644 --- a/tests/replication2_dirty.erl +++ b/tests/replication2_dirty.erl @@ -25,50 +25,58 @@ confirm() -> Nodes = deploy_nodes(NumNodes, Conf), {[AFirst|_] = ANodes, [BFirst|_] = BNodes} = lists:split(ClusterASize, Nodes), + + AllNodes = ANodes ++ BNodes, + rt:log_to_nodes(AllNodes, "Starting replication2_dirty test"), + lager:info("ANodes: ~p", [ANodes]), lager:info("BNodes: ~p", [BNodes]), + rt:log_to_nodes(AllNodes, "Building and connecting Clusters"), + lager:info("Build cluster A"), - replication2:make_cluster(ANodes), + repl_util:make_cluster(ANodes), lager:info("Build cluster B"), - replication2:make_cluster(BNodes), + repl_util:make_cluster(BNodes), - replication2:name_cluster(AFirst, "A"), - replication2:name_cluster(BFirst, "B"), + repl_util:name_cluster(AFirst, "A"), + repl_util:name_cluster(BFirst, "B"), rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(BNodes), %% get the leader for the first cluster - replication2:wait_until_leader(AFirst), + repl_util:wait_until_leader(AFirst), LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []), %LeaderB = rpc:call(BFirst, riak_core_cluster_mgr, get_leader, []), {ok, {_IP, Port}} = rpc:call(BFirst, application, get_env, [riak_core, cluster_mgr]), - replication2:connect_cluster(LeaderA, "127.0.0.1", Port), + repl_util:connect_cluster(LeaderA, "127.0.0.1", Port), - ?assertEqual(ok, replication2:wait_for_connection(LeaderA, "B")), - replication2:enable_realtime(LeaderA, "B"), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")), + repl_util:enable_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - replication2:start_realtime(LeaderA, "B"), + repl_util:start_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - replication2:enable_fullsync(LeaderA, "B"), + repl_util:enable_fullsync(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), % nothing should be dirty initially lager:info("Waiting until all nodes clean"), wait_until_all_nodes_clean(LeaderA), + rt:log_to_nodes(AllNodes, "Test basic realtime replication from A -> B"), + %% write some data on A - ?assertEqual(ok, replication2:wait_for_connection(LeaderA, "B")), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")), %io:format("~p~n", [rpc:call(LeaderA, riak_repl_console, status, [quiet])]), lager:info("Writing 2000 more keys to ~p", [LeaderA]), - ?assertEqual([], replication2:do_write(LeaderA, 101, 2000, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(LeaderA, 101, 2000, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading 2000 keys written to ~p from ~p", [LeaderA, BFirst]), - ?assertEqual(0, replication2:wait_for_reads(BFirst, 101, 2000, TestBucket, 2)), + ?assertEqual(0, repl_util:wait_for_reads(BFirst, 101, 2000, TestBucket, 2)), [ ?assertEqual(0, get_dirty_stat(Node)) || Node <- ANodes], [ ?assertEqual(0, get_dirty_stat(Node)) || Node <- BNodes], @@ -78,6 +86,8 @@ confirm() -> lager:info("Waiting until all nodes clean"), wait_until_all_nodes_clean(LeaderA), + rt:log_to_nodes(AllNodes, "Verify fullsync after manual dirty flag set"), + lager:info("Manually setting rt_dirty state"), % manually set this for now to simulate source errors @@ -88,12 +98,13 @@ confirm() -> wait_until_coord_has_dirty(LeaderA), lager:info("Starting fullsync"), - replication2:start_and_wait_until_fullsync_complete(LeaderA), + repl_util:start_and_wait_until_fullsync_complete(LeaderA), lager:info("Wait for all nodes to show up clean"), wait_until_all_nodes_clean(LeaderA), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + rt:log_to_nodes(AllNodes, "Multiple node test"), lager:info("Multiple node test"), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -110,12 +121,13 @@ confirm() -> wait_until_coord_has_dirty(DirtyB), lager:info("Starting fullsync"), - replication2:start_and_wait_until_fullsync_complete(LeaderA), + repl_util:start_and_wait_until_fullsync_complete(LeaderA), lager:info("Wait for all nodes to show up clean"), wait_until_all_nodes_clean(LeaderA), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + rt:log_to_nodes(AllNodes, "Multiple node test, one failed during fullsync"), lager:info("Multiple node test, one failed during fullsync"), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -135,7 +147,7 @@ confirm() -> ResultC = rpc:call(DirtyD, riak_repl_stats, rt_source_errors, []), lager:info("Result = ~p", [ResultC]) end), - replication2:start_and_wait_until_fullsync_complete(LeaderA), + repl_util:start_and_wait_until_fullsync_complete(LeaderA), lager:info("Checking to see if C is still clean"), wait_until_node_clean(DirtyC), @@ -143,11 +155,12 @@ confirm() -> wait_until_coord_has_dirty(DirtyD), % Clear out all dirty state - replication2:start_and_wait_until_fullsync_complete(LeaderA), + repl_util:start_and_wait_until_fullsync_complete(LeaderA), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + rt:log_to_nodes(AllNodes, "Brutally kill the sink nodes"), lager:info("Brutally kill the sink nodes"), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -159,13 +172,14 @@ confirm() -> % 3000 may need to be increased if the test fails here on % a fast machine lager:info("Writing 3000 more keys to ~p", [LeaderA]), - ?assertEqual([], replication2:do_write(LeaderA, 0, 3000, TestBucket, 2)), + ?assertEqual([], repl_util:do_write(LeaderA, 0, 3000, TestBucket, 2)), wait_until_coord_has_any_dirty(LeaderA), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + rt:log_to_nodes(AllNodes, "Check rt_dirty state after shutdown"), lager:info("Check rt_dirty state after shutdown"), %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -173,6 +187,7 @@ confirm() -> [ rt:start_and_wait(Node) || Node <- ANodes], wait_until_coord_has_any_dirty(LeaderA), + rt:log_to_nodes(AllNodes, "Test completed"), pass. get_dirty_stat(Node) -> From d80c30025cb0396aa39bf67d8577c3e71d78fbb7 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Fri, 25 Jan 2013 10:43:29 -0800 Subject: [PATCH 2/2] Remove export all from repl tests; make repl_util API explicit --- tests/repl_util.erl | 44 ++++++++++++++++++++++++++++++++++-- tests/replication2.erl | 25 -------------------- tests/replication2_dirty.erl | 1 - 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/tests/repl_util.erl b/tests/repl_util.erl index 9e9323a2..355fdda0 100644 --- a/tests/repl_util.erl +++ b/tests/repl_util.erl @@ -1,6 +1,29 @@ -module(repl_util). --export([make_cluster/1, name_cluster/2]). --compile(export_all). +-export([make_cluster/1, + name_cluster/2, + node_has_version/2, + nodes_with_version/2, + nodes_all_have_version/2, + wait_until_is_leader/1, + is_leader/1, + wait_until_is_not_leader/1, + wait_until_leader/1, + wait_until_new_leader/2, + wait_until_leader_converge/1, + wait_until_connection/1, + wait_until_no_connection/1, + wait_for_reads/5, + start_and_wait_until_fullsync_complete/1, + connect_cluster/3, + disconnect_cluster/2, + wait_for_connection/2, + enable_realtime/2, + disable_realtime/2, + enable_fullsync/2, + start_realtime/2, + stop_realtime/2, + do_write/5 + ]). -include_lib("eunit/include/eunit.hrl"). make_cluster(Nodes) -> @@ -192,3 +215,20 @@ do_write(Node, Start, End, Bucket, W) -> lists:flatten([rt:systest_write(Node, S, S, Bucket, W) || {S, _Error} <- Errors]) end. + +%% does the node meet the version requirement? +node_has_version(Node, Version) -> + NodeVersion = rtdev:node_version(rtdev:node_id(Node)), + case NodeVersion of + current -> + %% current always satisfies any version check + true; + _ -> + NodeVersion >= Version + end. + +nodes_with_version(Nodes, Version) -> + [Node || Node <- Nodes, node_has_version(Node, Version)]. + +nodes_all_have_version(Nodes, Version) -> + Nodes == nodes_with_version(Nodes, Version). diff --git a/tests/replication2.erl b/tests/replication2.erl index db826439..9e338b09 100644 --- a/tests/replication2.erl +++ b/tests/replication2.erl @@ -1,7 +1,6 @@ -module(replication2). -behavior(riak_test). -export([confirm/0]). --compile(export_all). -include_lib("eunit/include/eunit.hrl"). -import(rt, [deploy_nodes/2, @@ -591,27 +590,3 @@ collect_results(Workers, Acc) -> {'DOWN', _, _, Pid, _Reason} -> collect_results(lists:keydelete(Pid, 1, Workers), Acc) end. - -%% does the node meet the version requirement? -node_has_version(Node, Version) -> - NodeVersion = rtdev:node_version(rtdev:node_id(Node)), - case NodeVersion of - current -> - %% current always satisfies any version check - true; - _ -> - NodeVersion >= Version - end. - -nodes_with_version(Nodes, Version) -> - [Node || Node <- Nodes, node_has_version(Node, Version)]. - -nodes_all_have_version(Nodes, Version) -> - Nodes == nodes_with_version(Nodes, Version). - -client_count(Node) -> - Clients = rpc:call(Node, supervisor, which_children, [riak_repl_client_sup]), - length(Clients). - - - diff --git a/tests/replication2_dirty.erl b/tests/replication2_dirty.erl index 2e3b6009..4dc8695e 100644 --- a/tests/replication2_dirty.erl +++ b/tests/replication2_dirty.erl @@ -1,6 +1,5 @@ -module(replication2_dirty). -export([confirm/0]). --compile(export_all). -include_lib("eunit/include/eunit.hrl"). -import(rt, [deploy_nodes/2,