Merge pull request #182 from basho/cet-repl2-refactor

Moved common repl2 functions to repl_util and added log_to_nodes tracing...
This commit is contained in:
Dave Parfitt 2013-01-25 11:27:21 -08:00
commit 9f34d23e53
3 changed files with 317 additions and 298 deletions

View File

@ -1,15 +1,234 @@
-module(repl_util). -module(repl_util).
-export([make_cluster/1]). -export([make_cluster/1,
-compile(export_all). name_cluster/2,
node_has_version/2,
nodes_with_version/2,
nodes_all_have_version/2,
wait_until_is_leader/1,
is_leader/1,
wait_until_is_not_leader/1,
wait_until_leader/1,
wait_until_new_leader/2,
wait_until_leader_converge/1,
wait_until_connection/1,
wait_until_no_connection/1,
wait_for_reads/5,
start_and_wait_until_fullsync_complete/1,
connect_cluster/3,
disconnect_cluster/2,
wait_for_connection/2,
enable_realtime/2,
disable_realtime/2,
enable_fullsync/2,
start_realtime/2,
stop_realtime/2,
do_write/5
]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
join/2,
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
make_cluster(Nodes) -> make_cluster(Nodes) ->
[First|Rest] = Nodes, [First|Rest] = Nodes,
[join(Node, First) || Node <- Rest], [rt:join(Node, First) || Node <- Rest],
?assertEqual(ok, wait_until_nodes_ready(Nodes)), ?assertEqual(ok, rt:wait_until_nodes_ready(Nodes)),
?assertEqual(ok, wait_until_no_pending_changes(Nodes)). ?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes)).
name_cluster(Node, Name) ->
lager:info("Naming cluster ~p",[Name]),
Res = rpc:call(Node, riak_repl_console, clustername, [[Name]]),
?assertEqual(ok, Res).
wait_until_is_leader(Node) ->
lager:info("wait_until_is_leader(~p)", [Node]),
rt:wait_until(Node, fun is_leader/1).
is_leader(Node) ->
case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of
{badrpc, _} ->
lager:info("Badrpc"),
false;
Leader ->
lager:info("Checking: ~p =:= ~p", [Leader, Node]),
Leader =:= Node
end.
wait_until_is_not_leader(Node) ->
lager:info("wait_until_is_not_leader(~p)", [Node]),
rt:wait_until(Node, fun is_not_leader/1).
is_not_leader(Node) ->
case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of
{badrpc, _} ->
lager:info("Badrpc"),
false;
Leader ->
lager:info("Checking: ~p =/= ~p", [Leader, Node]),
Leader =/= Node
end.
wait_until_leader(Node) ->
wait_until_new_leader(Node, undefined).
wait_until_new_leader(Node, OldLeader) ->
Res = rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_core_cluster_mgr, get_leader, []),
case Status of
{badrpc, _} ->
false;
undefined ->
false;
OldLeader ->
false;
_Other ->
true
end
end),
?assertEqual(ok, Res).
wait_until_leader_converge([Node|_] = Nodes) ->
rt:wait_until(Node,
fun(_) ->
length(lists:usort([begin
case rpc:call(N, riak_core_cluster_mgr, get_leader, []) of
undefined ->
false;
L ->
%lager:info("Leader for ~p is ~p",
%[N,L]),
L
end
end || N <- Nodes])) == 1
end).
wait_until_connection(Node) ->
rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(fullsync_coordinator, Status) of
[] ->
false;
[_C] ->
true;
Conns ->
lager:warning("multiple connections detected: ~p",
[Conns]),
true
end
end). %% 40 seconds is enough for repl
wait_until_no_connection(Node) ->
rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(connected_clusters, Status) of
[] ->
true;
_ ->
false
end
end). %% 40 seconds is enough for repl
wait_for_reads(Node, Start, End, Bucket, R) ->
rt:wait_until(Node,
fun(_) ->
rt:systest_read(Node, Start, End, Bucket, R) == []
end),
Reads = rt:systest_read(Node, Start, End, Bucket, R),
lager:info("Reads: ~p", [Reads]),
length(Reads).
start_and_wait_until_fullsync_complete(Node) ->
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
Count = proplists:get_value(server_fullsyncs, Status0) + 1,
lager:info("waiting for fullsync count to be ~p", [Count]),
lager:info("Starting fullsync on ~p (~p)", [Node,
rtdev:node_version(rtdev:node_id(Node))]),
rpc:call(Node, riak_repl_console, fullsync, [["start"]]),
%% sleep because of the old bug where stats will crash if you call it too
%% soon after starting a fullsync
timer:sleep(500),
Res = rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(server_fullsyncs, Status) of
C when C >= Count ->
true;
_ ->
false
end
end),
?assertEqual(ok, Res),
lager:info("Fullsync on ~p complete", [Node]).
connect_cluster(Node, IP, Port) ->
Res = rpc:call(Node, riak_repl_console, connect,
[[IP, integer_to_list(Port)]]),
?assertEqual(ok, Res).
disconnect_cluster(Node, Name) ->
Res = rpc:call(Node, riak_repl_console, disconnect,
[[Name]]),
?assertEqual(ok, Res).
wait_for_connection(Node, Name) ->
rt:wait_until(Node,
fun(_) ->
{ok, Connections} = rpc:call(Node, riak_core_cluster_mgr,
get_connections, []),
lists:any(fun({{cluster_by_name, N}, _}) when N == Name -> true;
(_) -> false
end, Connections)
end).
enable_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["enable", Cluster]]),
?assertEqual(ok, Res).
disable_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["disable", Cluster]]),
?assertEqual(ok, Res).
enable_fullsync(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, fullsync, [["enable", Cluster]]),
?assertEqual(ok, Res).
start_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["start", Cluster]]),
?assertEqual(ok, Res).
stop_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["stop", Cluster]]),
?assertEqual(ok, Res).
do_write(Node, Start, End, Bucket, W) ->
case rt:systest_write(Node, Start, End, Bucket, W) of
[] ->
[];
Errors ->
lager:warning("~p errors while writing: ~p",
[length(Errors), Errors]),
timer:sleep(1000),
lists:flatten([rt:systest_write(Node, S, S, Bucket, W) ||
{S, _Error} <- Errors])
end.
%% does the node meet the version requirement?
node_has_version(Node, Version) ->
NodeVersion = rtdev:node_version(rtdev:node_id(Node)),
case NodeVersion of
current ->
%% current always satisfies any version check
true;
_ ->
NodeVersion >= Version
end.
nodes_with_version(Nodes, Version) ->
[Node || Node <- Nodes, node_has_version(Node, Version)].
nodes_all_have_version(Nodes, Version) ->
Nodes == nodes_with_version(Nodes, Version).

View File

@ -1,7 +1,6 @@
-module(replication2). -module(replication2).
-behavior(riak_test). -behavior(riak_test).
-export([confirm/0]). -export([confirm/0]).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2, -import(rt, [deploy_nodes/2,
@ -61,35 +60,35 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
%% write some initial data to A %% write some initial data to A
lager:info("Writing 100 keys to ~p", [AFirst]), lager:info("Writing 100 keys to ~p", [AFirst]),
?assertEqual([], do_write(AFirst, 1, 100, TestBucket, 2)), ?assertEqual([], repl_util:do_write(AFirst, 1, 100, TestBucket, 2)),
name_cluster(AFirst, "A"), repl_util:name_cluster(AFirst, "A"),
name_cluster(BFirst, "B"), repl_util:name_cluster(BFirst, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
rt:wait_until_ring_converged(BNodes), rt:wait_until_ring_converged(BNodes),
%% TODO: we'll need to wait for cluster names before continuing %% TODO: we'll need to wait for cluster names before continuing
%% get the leader for the first cluster %% get the leader for the first cluster
wait_until_leader(AFirst), repl_util:wait_until_leader(AFirst),
LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []), LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []),
{ok, {_IP, Port}} = rpc:call(BFirst, application, get_env, {ok, {_IP, Port}} = rpc:call(BFirst, application, get_env,
[riak_core, cluster_mgr]), [riak_core, cluster_mgr]),
connect_cluster(LeaderA, "127.0.0.1", Port), repl_util:connect_cluster(LeaderA, "127.0.0.1", Port),
?assertEqual(ok, wait_for_connection(LeaderA, "B")), ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
enable_realtime(LeaderA, "B"), repl_util:enable_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
start_realtime(LeaderA, "B"), repl_util:start_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
enable_fullsync(LeaderA, "B"), repl_util:enable_fullsync(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes); rt:wait_until_ring_converged(ANodes);
_ -> _ ->
lager:info("waiting for leader to converge on cluster A"), lager:info("waiting for leader to converge on cluster A"),
?assertEqual(ok, wait_until_leader_converge(ANodes)), ?assertEqual(ok, repl_util:wait_until_leader_converge(ANodes)),
lager:info("waiting for leader to converge on cluster B"), lager:info("waiting for leader to converge on cluster B"),
?assertEqual(ok, wait_until_leader_converge(BNodes)), ?assertEqual(ok, repl_util:wait_until_leader_converge(BNodes)),
%% get the leader for the first cluster %% get the leader for the first cluster
LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []), LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []),
lager:info("Leader on cluster A is ~p", [LeaderA]), lager:info("Leader on cluster A is ~p", [LeaderA]),
@ -100,14 +99,14 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
log_to_nodes(AllNodes, "Write data to A, verify replication to B via realtime"), log_to_nodes(AllNodes, "Write data to A, verify replication to B via realtime"),
%% write some data on A %% write some data on A
?assertEqual(ok, wait_for_connection(LeaderA, "B")), ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
%io:format("~p~n", [rpc:call(LeaderA, riak_repl_console, status, [quiet])]), %io:format("~p~n", [rpc:call(LeaderA, riak_repl_console, status, [quiet])]),
lager:info("Writing 100 more keys to ~p", [LeaderA]), lager:info("Writing 100 more keys to ~p", [LeaderA]),
?assertEqual([], do_write(LeaderA, 101, 200, TestBucket, 2)), ?assertEqual([], repl_util:do_write(LeaderA, 101, 200, TestBucket, 2)),
%% verify data is replicated to B %% verify data is replicated to B
lager:info("Reading 100 keys written to ~p from ~p", [LeaderA, BFirst]), lager:info("Reading 100 keys written to ~p from ~p", [LeaderA, BFirst]),
?assertEqual(0, wait_for_reads(BFirst, 101, 200, TestBucket, 2)), ?assertEqual(0, repl_util:wait_for_reads(BFirst, 101, 200, TestBucket, 2)),
case Connected of case Connected of
false -> false ->
@ -118,10 +117,10 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
?assertEqual(100, length(Res2)), ?assertEqual(100, length(Res2)),
log_to_nodes(AllNodes, "Test fullsync with leader ~p", [LeaderA]), log_to_nodes(AllNodes, "Test fullsync with leader ~p", [LeaderA]),
start_and_wait_until_fullsync_complete(LeaderA), repl_util:start_and_wait_until_fullsync_complete(LeaderA),
lager:info("Check keys written before repl was connected are present"), lager:info("Check keys written before repl was connected are present"),
?assertEqual(0, wait_for_reads(BFirst, 1, 200, TestBucket, 2)); ?assertEqual(0, repl_util:wait_for_reads(BFirst, 1, 200, TestBucket, 2));
_ -> _ ->
ok ok
end, end,
@ -137,22 +136,22 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
rt:stop(LeaderA), rt:stop(LeaderA),
rt:wait_until_unpingable(LeaderA), rt:wait_until_unpingable(LeaderA),
ASecond = hd(ANodes -- [LeaderA]), ASecond = hd(ANodes -- [LeaderA]),
wait_until_leader(ASecond), repl_util:wait_until_leader(ASecond),
LeaderA2 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []), LeaderA2 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []),
lager:info("New leader is ~p", [LeaderA2]), lager:info("New leader is ~p", [LeaderA2]),
?assertEqual(ok, wait_until_connection(LeaderA2)), ?assertEqual(ok, repl_util:wait_until_connection(LeaderA2)),
lager:info("Writing 100 more keys to ~p now that the old leader is down", lager:info("Writing 100 more keys to ~p now that the old leader is down",
[ASecond]), [ASecond]),
?assertEqual([], do_write(ASecond, 201, 300, TestBucket, 2)), ?assertEqual([], repl_util:do_write(ASecond, 201, 300, TestBucket, 2)),
%% verify data is replicated to B %% verify data is replicated to B
lager:info("Reading 100 keys written to ~p from ~p", [ASecond, BFirst]), lager:info("Reading 100 keys written to ~p from ~p", [ASecond, BFirst]),
?assertEqual(0, wait_for_reads(BFirst, 201, 300, TestBucket, 2)), ?assertEqual(0, repl_util:wait_for_reads(BFirst, 201, 300, TestBucket, 2)),
%% get the leader for the first cluster %% get the leader for the first cluster
LeaderB = rpc:call(BFirst, riak_core_cluster_mgr, get_leader, []), LeaderB = rpc:call(BFirst, riak_core_cluster_mgr, get_leader, []),
@ -163,28 +162,28 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
rt:stop(LeaderB), rt:stop(LeaderB),
rt:wait_until_unpingable(LeaderB), rt:wait_until_unpingable(LeaderB),
BSecond = hd(BNodes -- [LeaderB]), BSecond = hd(BNodes -- [LeaderB]),
wait_until_leader(BSecond), repl_util:wait_until_leader(BSecond),
LeaderB2 = rpc:call(BSecond, riak_core_cluster_mgr, get_leader, []), LeaderB2 = rpc:call(BSecond, riak_core_cluster_mgr, get_leader, []),
lager:info("New leader is ~p", [LeaderB2]), lager:info("New leader is ~p", [LeaderB2]),
?assertEqual(ok, wait_until_connection(LeaderA2)), ?assertEqual(ok, repl_util:wait_until_connection(LeaderA2)),
lager:info("Writing 100 more keys to ~p now that the old leader is down", lager:info("Writing 100 more keys to ~p now that the old leader is down",
[ASecond]), [ASecond]),
?assertEqual([], do_write(ASecond, 301, 400, TestBucket, 2)), ?assertEqual([], repl_util:do_write(ASecond, 301, 400, TestBucket, 2)),
%% verify data is replicated to B %% verify data is replicated to B
lager:info("Reading 101 keys written to ~p from ~p", [ASecond, BSecond]), lager:info("Reading 101 keys written to ~p from ~p", [ASecond, BSecond]),
?assertEqual(0, wait_for_reads(BSecond, 301, 400, TestBucket, 2)), ?assertEqual(0, repl_util:wait_for_reads(BSecond, 301, 400, TestBucket, 2)),
%% Testing fullsync with downed nodes %% Testing fullsync with downed nodes
log_to_nodes(AllNodes, "Test fullsync with ~p and ~p down", [LeaderA, LeaderB]), log_to_nodes(AllNodes, "Test fullsync with ~p and ~p down", [LeaderA, LeaderB]),
lager:info("Re-running fullsync with ~p and ~p down", [LeaderA, LeaderB]), lager:info("Re-running fullsync with ~p and ~p down", [LeaderA, LeaderB]),
start_and_wait_until_fullsync_complete(LeaderA2), repl_util:start_and_wait_until_fullsync_complete(LeaderA2),
%% %%
%% Per-bucket repl settings tests %% Per-bucket repl settings tests
@ -195,7 +194,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
lager:info("Restarting down node ~p", [LeaderA]), lager:info("Restarting down node ~p", [LeaderA]),
rt:start(LeaderA), rt:start(LeaderA),
rt:wait_until_pingable(LeaderA), rt:wait_until_pingable(LeaderA),
start_and_wait_until_fullsync_complete(LeaderA2), repl_util:start_and_wait_until_fullsync_complete(LeaderA2),
log_to_nodes(AllNodes, "Starting Joe's Repl Test"), log_to_nodes(AllNodes, "Starting Joe's Repl Test"),
lager:info("Starting Joe's Repl Test"), lager:info("Starting Joe's Repl Test"),
@ -209,7 +208,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
lager:info("LeaderA: ~p", [LeaderA]), lager:info("LeaderA: ~p", [LeaderA]),
lager:info("LeaderA2: ~p", [LeaderA2]), lager:info("LeaderA2: ~p", [LeaderA2]),
?assertEqual(ok, wait_until_connection(LeaderA)), ?assertEqual(ok, repl_util:wait_until_connection(LeaderA)),
log_to_nodes(AllNodes, "Simulate partition to force leader re-election"), log_to_nodes(AllNodes, "Simulate partition to force leader re-election"),
@ -222,7 +221,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
[ rpc:call(LeaderA2, erlang, disconnect_node, [Node]) || Node <- ANodes -- [LeaderA2]], [ rpc:call(LeaderA2, erlang, disconnect_node, [Node]) || Node <- ANodes -- [LeaderA2]],
[ rpc:call(Node, erlang, disconnect_node, [LeaderA2]) || Node <- ANodes -- [LeaderA2]], [ rpc:call(Node, erlang, disconnect_node, [LeaderA2]) || Node <- ANodes -- [LeaderA2]],
wait_until_new_leader(hd(ANodes -- [LeaderA2]), LeaderA2), repl_util:wait_until_new_leader(hd(ANodes -- [LeaderA2]), LeaderA2),
InterimLeader = rpc:call(LeaderA, riak_core_cluster_mgr, get_leader, []), InterimLeader = rpc:call(LeaderA, riak_core_cluster_mgr, get_leader, []),
lager:info("Interim leader: ~p", [InterimLeader]), lager:info("Interim leader: ~p", [InterimLeader]),
@ -234,15 +233,15 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
%% there's no point in writing anything until the leaders converge, as we %% there's no point in writing anything until the leaders converge, as we
%% can drop writes in the middle of an election %% can drop writes in the middle of an election
wait_until_leader_converge(ANodes), repl_util:wait_until_leader_converge(ANodes),
lager:info("Leader: ~p", [rpc:call(ASecond, riak_core_cluster_mgr, get_leader, [])]), lager:info("Leader: ~p", [rpc:call(ASecond, riak_core_cluster_mgr, get_leader, [])]),
lager:info("Writing 2 more keys to ~p", [LeaderA]), lager:info("Writing 2 more keys to ~p", [LeaderA]),
?assertEqual([], do_write(LeaderA, 1301, 1302, TestBucket, 2)), ?assertEqual([], repl_util:do_write(LeaderA, 1301, 1302, TestBucket, 2)),
%% verify data is replicated to B %% verify data is replicated to B
lager:info("Reading 2 keys written to ~p from ~p", [LeaderA, BSecond]), lager:info("Reading 2 keys written to ~p from ~p", [LeaderA, BSecond]),
?assertEqual(0, wait_for_reads(BSecond, 1301, 1302, TestBucket, 2)), ?assertEqual(0, repl_util:wait_for_reads(BSecond, 1301, 1302, TestBucket, 2)),
log_to_nodes(AllNodes, "Finished Joe's Section"), log_to_nodes(AllNodes, "Finished Joe's Section"),
lager:info("Finished Joe's Section"), lager:info("Finished Joe's Section"),
@ -260,37 +259,37 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
%% disconnect the other cluster, so realtime doesn't happen %% disconnect the other cluster, so realtime doesn't happen
lager:info("disconnect the 2 clusters"), lager:info("disconnect the 2 clusters"),
disable_realtime(LeaderA, "B"), repl_util:disable_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
disconnect_cluster(LeaderA, "B"), repl_util:disconnect_cluster(LeaderA, "B"),
wait_until_no_connection(LeaderA), repl_util:wait_until_no_connection(LeaderA),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
lager:info("write 100 keys to a realtime only bucket"), lager:info("write 100 keys to a realtime only bucket"),
?assertEqual([], do_write(ASecond, 1, 100, ?assertEqual([], repl_util:do_write(ASecond, 1, 100,
RealtimeOnly, 2)), RealtimeOnly, 2)),
lager:info("reconnect the 2 clusters"), lager:info("reconnect the 2 clusters"),
connect_cluster(LeaderA, "127.0.0.1", Port), repl_util:connect_cluster(LeaderA, "127.0.0.1", Port),
?assertEqual(ok, wait_for_connection(LeaderA, "B")), ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
enable_realtime(LeaderA, "B"), repl_util:enable_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
enable_fullsync(LeaderA, "B"), repl_util:enable_fullsync(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
start_realtime(LeaderA, "B"), repl_util:start_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
?assertEqual(ok, wait_until_connection(LeaderA)), ?assertEqual(ok, repl_util:wait_until_connection(LeaderA)),
LeaderA3 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []), LeaderA3 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []),
log_to_nodes(AllNodes, "Test fullsync and realtime independence"), log_to_nodes(AllNodes, "Test fullsync and realtime independence"),
lager:info("write 100 keys to a {repl, false} bucket"), lager:info("write 100 keys to a {repl, false} bucket"),
?assertEqual([], do_write(ASecond, 1, 100, NoRepl, 2)), ?assertEqual([], repl_util:do_write(ASecond, 1, 100, NoRepl, 2)),
lager:info("write 100 keys to a fullsync only bucket"), lager:info("write 100 keys to a fullsync only bucket"),
?assertEqual([], do_write(ASecond, 1, 100, ?assertEqual([], repl_util:do_write(ASecond, 1, 100,
FullsyncOnly, 2)), FullsyncOnly, 2)),
lager:info("Check the fullsync only bucket didn't replicate the writes"), lager:info("Check the fullsync only bucket didn't replicate the writes"),
@ -308,10 +307,10 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
%% do a fullsync, make sure that fullsync_only is replicated, but %% do a fullsync, make sure that fullsync_only is replicated, but
%% realtime_only and no_repl aren't %% realtime_only and no_repl aren't
start_and_wait_until_fullsync_complete(LeaderA3), repl_util:start_and_wait_until_fullsync_complete(LeaderA3),
lager:info("Check fullsync only bucket is now replicated"), lager:info("Check fullsync only bucket is now replicated"),
?assertEqual(0, wait_for_reads(BSecond, 1, 100, ?assertEqual(0, repl_util:wait_for_reads(BSecond, 1, 100,
FullsyncOnly, 2)), FullsyncOnly, 2)),
lager:info("Check realtime only bucket didn't replicate"), lager:info("Check realtime only bucket didn't replicate"),
@ -321,13 +320,13 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
lager:info("Write 100 more keys into realtime only bucket on ~p", lager:info("Write 100 more keys into realtime only bucket on ~p",
[ASecond]), [ASecond]),
?assertEqual([], do_write(ASecond, 101, 200, ?assertEqual([], repl_util:do_write(ASecond, 101, 200,
RealtimeOnly, 2)), RealtimeOnly, 2)),
timer:sleep(5000), timer:sleep(5000),
lager:info("Check the realtime keys replicated"), lager:info("Check the realtime keys replicated"),
?assertEqual(0, wait_for_reads(BSecond, 101, 200, ?assertEqual(0, repl_util:wait_for_reads(BSecond, 101, 200,
RealtimeOnly, 2)), RealtimeOnly, 2)),
lager:info("Check the older keys in the realtime bucket did not replicate"), lager:info("Check the older keys in the realtime bucket did not replicate"),
@ -344,20 +343,20 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
LeaderA4 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []), LeaderA4 = rpc:call(ASecond, riak_core_cluster_mgr, get_leader, []),
lager:info("Stopping realtime, queue will build"), lager:info("Stopping realtime, queue will build"),
stop_realtime(LeaderA4, "B"), repl_util:stop_realtime(LeaderA4, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
lager:info("Writing 100 keys"), lager:info("Writing 100 keys"),
?assertEqual([], do_write(LeaderA4, 800, 900, ?assertEqual([], repl_util:do_write(LeaderA4, 800, 900,
TestBucket, 2)), TestBucket, 2)),
lager:info("Starting realtime"), lager:info("Starting realtime"),
start_realtime(LeaderA4, "B"), repl_util:start_realtime(LeaderA4, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
timer:sleep(3000), timer:sleep(3000),
lager:info("Reading keys written while repl was stopped"), lager:info("Reading keys written while repl was stopped"),
?assertEqual(0, wait_for_reads(BSecond, 800, 900, ?assertEqual(0, repl_util:wait_for_reads(BSecond, 800, 900,
TestBucket, 2)), TestBucket, 2)),
log_to_nodes(AllNodes, "Testing realtime migration on node shutdown"), log_to_nodes(AllNodes, "Testing realtime migration on node shutdown"),
@ -365,11 +364,11 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
Target = hd(ANodes -- [LeaderA4]), Target = hd(ANodes -- [LeaderA4]),
lager:info("Stopping realtime, queue will build"), lager:info("Stopping realtime, queue will build"),
stop_realtime(LeaderA4, "B"), repl_util:stop_realtime(LeaderA4, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
lager:info("Writing 100 keys"), lager:info("Writing 100 keys"),
?assertEqual([], do_write(Target, 900, 1000, ?assertEqual([], repl_util:do_write(Target, 900, 1000,
TestBucket, 2)), TestBucket, 2)),
io:format("queue status: ~p", io:format("queue status: ~p",
@ -381,11 +380,11 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
rt:wait_until_unpingable(Target), rt:wait_until_unpingable(Target),
lager:info("Starting realtime"), lager:info("Starting realtime"),
start_realtime(LeaderA4, "B"), repl_util:start_realtime(LeaderA4, "B"),
timer:sleep(3000), timer:sleep(3000),
lager:info("Reading keys written while repl was stopped"), lager:info("Reading keys written while repl was stopped"),
?assertEqual(0, wait_for_reads(BSecond, 900, 1000, ?assertEqual(0, repl_util:wait_for_reads(BSecond, 900, 1000,
TestBucket, 2)), TestBucket, 2)),
lager:info("Restarting node ~p", [Target]), lager:info("Restarting node ~p", [Target]),
@ -591,216 +590,3 @@ collect_results(Workers, Acc) ->
{'DOWN', _, _, Pid, _Reason} -> {'DOWN', _, _, Pid, _Reason} ->
collect_results(lists:keydelete(Pid, 1, Workers), Acc) collect_results(lists:keydelete(Pid, 1, Workers), Acc)
end. end.
make_cluster(Nodes) ->
[First|Rest] = Nodes,
[join(Node, First) || Node <- Rest],
?assertEqual(ok, wait_until_nodes_ready(Nodes)),
?assertEqual(ok, wait_until_no_pending_changes(Nodes)).
%% does the node meet the version requirement?
node_has_version(Node, Version) ->
NodeVersion = rtdev:node_version(rtdev:node_id(Node)),
case NodeVersion of
current ->
%% current always satisfies any version check
true;
_ ->
NodeVersion >= Version
end.
nodes_with_version(Nodes, Version) ->
[Node || Node <- Nodes, node_has_version(Node, Version)].
nodes_all_have_version(Nodes, Version) ->
Nodes == nodes_with_version(Nodes, Version).
client_count(Node) ->
Clients = rpc:call(Node, supervisor, which_children, [riak_repl_client_sup]),
length(Clients).
start_and_wait_until_fullsync_complete(Node) ->
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
Count = proplists:get_value(server_fullsyncs, Status0) + 1,
lager:info("waiting for fullsync count to be ~p", [Count]),
lager:info("Starting fullsync on ~p (~p)", [Node,
rtdev:node_version(rtdev:node_id(Node))]),
rpc:call(Node, riak_repl_console, fullsync, [["start"]]),
%% sleep because of the old bug where stats will crash if you call it too
%% soon after starting a fullsync
timer:sleep(500),
Res = rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(server_fullsyncs, Status) of
C when C >= Count ->
true;
_ ->
false
end
end),
?assertEqual(ok, Res),
lager:info("Fullsync on ~p complete", [Node]).
wait_until_is_leader(Node) ->
lager:info("wait_until_is_leader(~p)", [Node]),
rt:wait_until(Node, fun is_leader/1).
is_leader(Node) ->
case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of
{badrpc, _} ->
lager:info("Badrpc"),
false;
Leader ->
lager:info("Checking: ~p =:= ~p", [Leader, Node]),
Leader =:= Node
end.
wait_until_is_not_leader(Node) ->
lager:info("wait_until_is_not_leader(~p)", [Node]),
rt:wait_until(Node, fun is_not_leader/1).
is_not_leader(Node) ->
case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of
{badrpc, _} ->
lager:info("Badrpc"),
false;
Leader ->
lager:info("Checking: ~p =/= ~p", [Leader, Node]),
Leader =/= Node
end.
wait_until_leader(Node) ->
wait_until_new_leader(Node, undefined).
wait_until_new_leader(Node, OldLeader) ->
Res = rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_core_cluster_mgr, get_leader, []),
case Status of
{badrpc, _} ->
false;
undefined ->
false;
OldLeader ->
false;
_Other ->
true
end
end),
?assertEqual(ok, Res).
wait_until_leader_converge([Node|_] = Nodes) ->
rt:wait_until(Node,
fun(_) ->
length(lists:usort([begin
case rpc:call(N, riak_core_cluster_mgr, get_leader, []) of
undefined ->
false;
L ->
%lager:info("Leader for ~p is ~p",
%[N,L]),
L
end
end || N <- Nodes])) == 1
end).
wait_until_connection(Node) ->
rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(fullsync_coordinator, Status) of
[] ->
false;
[_C] ->
true;
Conns ->
lager:warning("multiple connections detected: ~p",
[Conns]),
true
end
end). %% 40 seconds is enough for repl
wait_until_no_connection(Node) ->
rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(connected_clusters, Status) of
[] ->
true;
_ ->
false
end
end). %% 40 seconds is enough for repl
wait_for_reads(Node, Start, End, Bucket, R) ->
rt:wait_until(Node,
fun(_) ->
rt:systest_read(Node, Start, End, Bucket, R) == []
end),
Reads = rt:systest_read(Node, Start, End, Bucket, R),
lager:info("Reads: ~p", [Reads]),
length(Reads).
do_write(Node, Start, End, Bucket, W) ->
case rt:systest_write(Node, Start, End, Bucket, W) of
[] ->
[];
Errors ->
lager:warning("~p errors while writing: ~p",
[length(Errors), Errors]),
timer:sleep(1000),
lists:flatten([rt:systest_write(Node, S, S, Bucket, W) ||
{S, _Error} <- Errors])
end.
name_cluster(Node, Name) ->
lager:info("Naming cluster ~p",[Name]),
Res = rpc:call(Node, riak_repl_console, clustername, [[Name]]),
?assertEqual(ok, Res).
connect_cluster(Node, IP, Port) ->
Res = rpc:call(Node, riak_repl_console, connect,
[[IP, integer_to_list(Port)]]),
?assertEqual(ok, Res).
disconnect_cluster(Node, Name) ->
Res = rpc:call(Node, riak_repl_console, disconnect,
[[Name]]),
?assertEqual(ok, Res).
wait_for_connection(Node, Name) ->
rt:wait_until(Node,
fun(_) ->
{ok, Connections} = rpc:call(Node, riak_core_cluster_mgr,
get_connections, []),
lists:any(fun({{cluster_by_name, N}, _}) when N == Name -> true;
(_) -> false
end, Connections)
end).
enable_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["enable", Cluster]]),
?assertEqual(ok, Res).
disable_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["disable", Cluster]]),
?assertEqual(ok, Res).
enable_fullsync(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, fullsync, [["enable", Cluster]]),
?assertEqual(ok, Res).
start_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["start", Cluster]]),
?assertEqual(ok, Res).
stop_realtime(Node, Cluster) ->
Res = rpc:call(Node, riak_repl_console, realtime, [["stop", Cluster]]),
?assertEqual(ok, Res).

View File

@ -1,6 +1,5 @@
-module(replication2_dirty). -module(replication2_dirty).
-export([confirm/0]). -export([confirm/0]).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2, -import(rt, [deploy_nodes/2,
@ -25,50 +24,58 @@ confirm() ->
Nodes = deploy_nodes(NumNodes, Conf), Nodes = deploy_nodes(NumNodes, Conf),
{[AFirst|_] = ANodes, [BFirst|_] = BNodes} = lists:split(ClusterASize, Nodes), {[AFirst|_] = ANodes, [BFirst|_] = BNodes} = lists:split(ClusterASize, Nodes),
AllNodes = ANodes ++ BNodes,
rt:log_to_nodes(AllNodes, "Starting replication2_dirty test"),
lager:info("ANodes: ~p", [ANodes]), lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]), lager:info("BNodes: ~p", [BNodes]),
rt:log_to_nodes(AllNodes, "Building and connecting Clusters"),
lager:info("Build cluster A"), lager:info("Build cluster A"),
replication2:make_cluster(ANodes), repl_util:make_cluster(ANodes),
lager:info("Build cluster B"), lager:info("Build cluster B"),
replication2:make_cluster(BNodes), repl_util:make_cluster(BNodes),
replication2:name_cluster(AFirst, "A"), repl_util:name_cluster(AFirst, "A"),
replication2:name_cluster(BFirst, "B"), repl_util:name_cluster(BFirst, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
rt:wait_until_ring_converged(BNodes), rt:wait_until_ring_converged(BNodes),
%% get the leader for the first cluster %% get the leader for the first cluster
replication2:wait_until_leader(AFirst), repl_util:wait_until_leader(AFirst),
LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []), LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []),
%LeaderB = rpc:call(BFirst, riak_core_cluster_mgr, get_leader, []), %LeaderB = rpc:call(BFirst, riak_core_cluster_mgr, get_leader, []),
{ok, {_IP, Port}} = rpc:call(BFirst, application, get_env, {ok, {_IP, Port}} = rpc:call(BFirst, application, get_env,
[riak_core, cluster_mgr]), [riak_core, cluster_mgr]),
replication2:connect_cluster(LeaderA, "127.0.0.1", Port), repl_util:connect_cluster(LeaderA, "127.0.0.1", Port),
?assertEqual(ok, replication2:wait_for_connection(LeaderA, "B")), ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
replication2:enable_realtime(LeaderA, "B"), repl_util:enable_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
replication2:start_realtime(LeaderA, "B"), repl_util:start_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
replication2:enable_fullsync(LeaderA, "B"), repl_util:enable_fullsync(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes), rt:wait_until_ring_converged(ANodes),
% nothing should be dirty initially % nothing should be dirty initially
lager:info("Waiting until all nodes clean"), lager:info("Waiting until all nodes clean"),
wait_until_all_nodes_clean(LeaderA), wait_until_all_nodes_clean(LeaderA),
rt:log_to_nodes(AllNodes, "Test basic realtime replication from A -> B"),
%% write some data on A %% write some data on A
?assertEqual(ok, replication2:wait_for_connection(LeaderA, "B")), ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
%io:format("~p~n", [rpc:call(LeaderA, riak_repl_console, status, [quiet])]), %io:format("~p~n", [rpc:call(LeaderA, riak_repl_console, status, [quiet])]),
lager:info("Writing 2000 more keys to ~p", [LeaderA]), lager:info("Writing 2000 more keys to ~p", [LeaderA]),
?assertEqual([], replication2:do_write(LeaderA, 101, 2000, TestBucket, 2)), ?assertEqual([], repl_util:do_write(LeaderA, 101, 2000, TestBucket, 2)),
%% verify data is replicated to B %% verify data is replicated to B
lager:info("Reading 2000 keys written to ~p from ~p", [LeaderA, BFirst]), lager:info("Reading 2000 keys written to ~p from ~p", [LeaderA, BFirst]),
?assertEqual(0, replication2:wait_for_reads(BFirst, 101, 2000, TestBucket, 2)), ?assertEqual(0, repl_util:wait_for_reads(BFirst, 101, 2000, TestBucket, 2)),
[ ?assertEqual(0, get_dirty_stat(Node)) || Node <- ANodes], [ ?assertEqual(0, get_dirty_stat(Node)) || Node <- ANodes],
[ ?assertEqual(0, get_dirty_stat(Node)) || Node <- BNodes], [ ?assertEqual(0, get_dirty_stat(Node)) || Node <- BNodes],
@ -78,6 +85,8 @@ confirm() ->
lager:info("Waiting until all nodes clean"), lager:info("Waiting until all nodes clean"),
wait_until_all_nodes_clean(LeaderA), wait_until_all_nodes_clean(LeaderA),
rt:log_to_nodes(AllNodes, "Verify fullsync after manual dirty flag set"),
lager:info("Manually setting rt_dirty state"), lager:info("Manually setting rt_dirty state"),
% manually set this for now to simulate source errors % manually set this for now to simulate source errors
@ -88,12 +97,13 @@ confirm() ->
wait_until_coord_has_dirty(LeaderA), wait_until_coord_has_dirty(LeaderA),
lager:info("Starting fullsync"), lager:info("Starting fullsync"),
replication2:start_and_wait_until_fullsync_complete(LeaderA), repl_util:start_and_wait_until_fullsync_complete(LeaderA),
lager:info("Wait for all nodes to show up clean"), lager:info("Wait for all nodes to show up clean"),
wait_until_all_nodes_clean(LeaderA), wait_until_all_nodes_clean(LeaderA),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
rt:log_to_nodes(AllNodes, "Multiple node test"),
lager:info("Multiple node test"), lager:info("Multiple node test"),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -110,12 +120,13 @@ confirm() ->
wait_until_coord_has_dirty(DirtyB), wait_until_coord_has_dirty(DirtyB),
lager:info("Starting fullsync"), lager:info("Starting fullsync"),
replication2:start_and_wait_until_fullsync_complete(LeaderA), repl_util:start_and_wait_until_fullsync_complete(LeaderA),
lager:info("Wait for all nodes to show up clean"), lager:info("Wait for all nodes to show up clean"),
wait_until_all_nodes_clean(LeaderA), wait_until_all_nodes_clean(LeaderA),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
rt:log_to_nodes(AllNodes, "Multiple node test, one failed during fullsync"),
lager:info("Multiple node test, one failed during fullsync"), lager:info("Multiple node test, one failed during fullsync"),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -135,7 +146,7 @@ confirm() ->
ResultC = rpc:call(DirtyD, riak_repl_stats, rt_source_errors, []), ResultC = rpc:call(DirtyD, riak_repl_stats, rt_source_errors, []),
lager:info("Result = ~p", [ResultC]) lager:info("Result = ~p", [ResultC])
end), end),
replication2:start_and_wait_until_fullsync_complete(LeaderA), repl_util:start_and_wait_until_fullsync_complete(LeaderA),
lager:info("Checking to see if C is still clean"), lager:info("Checking to see if C is still clean"),
wait_until_node_clean(DirtyC), wait_until_node_clean(DirtyC),
@ -143,11 +154,12 @@ confirm() ->
wait_until_coord_has_dirty(DirtyD), wait_until_coord_has_dirty(DirtyD),
% Clear out all dirty state % Clear out all dirty state
replication2:start_and_wait_until_fullsync_complete(LeaderA), repl_util:start_and_wait_until_fullsync_complete(LeaderA),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
rt:log_to_nodes(AllNodes, "Brutally kill the sink nodes"),
lager:info("Brutally kill the sink nodes"), lager:info("Brutally kill the sink nodes"),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -159,13 +171,14 @@ confirm() ->
% 3000 may need to be increased if the test fails here on % 3000 may need to be increased if the test fails here on
% a fast machine % a fast machine
lager:info("Writing 3000 more keys to ~p", [LeaderA]), lager:info("Writing 3000 more keys to ~p", [LeaderA]),
?assertEqual([], replication2:do_write(LeaderA, 0, 3000, TestBucket, 2)), ?assertEqual([], repl_util:do_write(LeaderA, 0, 3000, TestBucket, 2)),
wait_until_coord_has_any_dirty(LeaderA), wait_until_coord_has_any_dirty(LeaderA),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
rt:log_to_nodes(AllNodes, "Check rt_dirty state after shutdown"),
lager:info("Check rt_dirty state after shutdown"), lager:info("Check rt_dirty state after shutdown"),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -173,6 +186,7 @@ confirm() ->
[ rt:start_and_wait(Node) || Node <- ANodes], [ rt:start_and_wait(Node) || Node <- ANodes],
wait_until_coord_has_any_dirty(LeaderA), wait_until_coord_has_any_dirty(LeaderA),
rt:log_to_nodes(AllNodes, "Test completed"),
pass. pass.
get_dirty_stat(Node) -> get_dirty_stat(Node) ->