2013-01-24 22:24:16 +00:00
|
|
|
-module(repl_util).
|
2013-01-25 18:43:29 +00:00
|
|
|
-export([make_cluster/1,
|
|
|
|
name_cluster/2,
|
|
|
|
node_has_version/2,
|
|
|
|
nodes_with_version/2,
|
|
|
|
nodes_all_have_version/2,
|
|
|
|
wait_until_is_leader/1,
|
|
|
|
is_leader/1,
|
|
|
|
wait_until_is_not_leader/1,
|
|
|
|
wait_until_leader/1,
|
|
|
|
wait_until_new_leader/2,
|
|
|
|
wait_until_leader_converge/1,
|
|
|
|
wait_until_connection/1,
|
|
|
|
wait_until_no_connection/1,
|
2013-04-19 19:57:58 +00:00
|
|
|
wait_until_aae_trees_built/1,
|
2013-01-25 18:43:29 +00:00
|
|
|
wait_for_reads/5,
|
|
|
|
start_and_wait_until_fullsync_complete/1,
|
2013-12-30 18:14:40 +00:00
|
|
|
start_and_wait_until_fullsync_complete/2,
|
2013-01-25 18:43:29 +00:00
|
|
|
connect_cluster/3,
|
|
|
|
disconnect_cluster/2,
|
|
|
|
wait_for_connection/2,
|
|
|
|
enable_realtime/2,
|
|
|
|
disable_realtime/2,
|
|
|
|
enable_fullsync/2,
|
|
|
|
start_realtime/2,
|
|
|
|
stop_realtime/2,
|
2014-02-04 15:42:10 +00:00
|
|
|
stop_fullsync/2,
|
|
|
|
disable_fullsync/2,
|
2013-06-13 19:57:23 +00:00
|
|
|
do_write/5,
|
|
|
|
get_fs_coord_status_item/3,
|
2013-08-29 19:49:41 +00:00
|
|
|
num_partitions/1,
|
|
|
|
get_cluster_mgr_port/1,
|
|
|
|
maybe_reconnect_rt/3,
|
2014-01-16 21:29:24 +00:00
|
|
|
connect_rt/3,
|
|
|
|
connect_cluster_by_name/3,
|
|
|
|
get_port/1,
|
|
|
|
get_leader/1,
|
|
|
|
write_to_cluster/4,
|
|
|
|
read_from_cluster/5,
|
|
|
|
check_fullsync/3,
|
|
|
|
validate_completed_fullsync/6
|
2013-01-25 18:43:29 +00:00
|
|
|
]).
|
2013-01-24 22:24:16 +00:00
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
|
|
make_cluster(Nodes) ->
|
|
|
|
[First|Rest] = Nodes,
|
2013-01-25 18:12:34 +00:00
|
|
|
?assertEqual(ok, rt:wait_until_nodes_ready(Nodes)),
|
2013-06-14 16:03:09 +00:00
|
|
|
[rt:wait_for_service(N, riak_kv) || N <- Nodes],
|
2013-06-04 21:13:56 +00:00
|
|
|
[rt:join(Node, First) || Node <- Rest],
|
2013-01-25 18:12:34 +00:00
|
|
|
?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes)).
|
|
|
|
|
|
|
|
name_cluster(Node, Name) ->
|
|
|
|
lager:info("Naming cluster ~p",[Name]),
|
|
|
|
Res = rpc:call(Node, riak_repl_console, clustername, [[Name]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
wait_until_is_leader(Node) ->
|
|
|
|
lager:info("wait_until_is_leader(~p)", [Node]),
|
|
|
|
rt:wait_until(Node, fun is_leader/1).
|
|
|
|
|
|
|
|
is_leader(Node) ->
|
|
|
|
case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of
|
2013-06-04 20:05:49 +00:00
|
|
|
{badrpc, Wut} ->
|
|
|
|
lager:info("Badrpc during is_leader for ~p. Error: ~p", [Node, Wut]),
|
2013-01-25 18:12:34 +00:00
|
|
|
false;
|
|
|
|
Leader ->
|
|
|
|
lager:info("Checking: ~p =:= ~p", [Leader, Node]),
|
|
|
|
Leader =:= Node
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
wait_until_is_not_leader(Node) ->
|
|
|
|
lager:info("wait_until_is_not_leader(~p)", [Node]),
|
|
|
|
rt:wait_until(Node, fun is_not_leader/1).
|
|
|
|
|
|
|
|
is_not_leader(Node) ->
|
|
|
|
case rpc:call(Node, riak_core_cluster_mgr, get_leader, []) of
|
2013-06-04 20:05:49 +00:00
|
|
|
{badrpc, Wut} ->
|
|
|
|
lager:info("Badrpc during is_not leader for ~p. Error: ~p", [Node, Wut]),
|
2013-01-25 18:12:34 +00:00
|
|
|
false;
|
|
|
|
Leader ->
|
|
|
|
lager:info("Checking: ~p =/= ~p", [Leader, Node]),
|
|
|
|
Leader =/= Node
|
|
|
|
end.
|
|
|
|
|
|
|
|
wait_until_leader(Node) ->
|
|
|
|
wait_until_new_leader(Node, undefined).
|
|
|
|
|
|
|
|
wait_until_new_leader(Node, OldLeader) ->
|
|
|
|
Res = rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
|
|
|
Status = rpc:call(Node, riak_core_cluster_mgr, get_leader, []),
|
|
|
|
case Status of
|
|
|
|
{badrpc, _} ->
|
|
|
|
false;
|
|
|
|
undefined ->
|
|
|
|
false;
|
|
|
|
OldLeader ->
|
|
|
|
false;
|
|
|
|
_Other ->
|
|
|
|
true
|
|
|
|
end
|
|
|
|
end),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
wait_until_leader_converge([Node|_] = Nodes) ->
|
|
|
|
rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
|
|
|
length(lists:usort([begin
|
|
|
|
case rpc:call(N, riak_core_cluster_mgr, get_leader, []) of
|
|
|
|
undefined ->
|
|
|
|
false;
|
|
|
|
L ->
|
|
|
|
%lager:info("Leader for ~p is ~p",
|
|
|
|
%[N,L]),
|
|
|
|
L
|
|
|
|
end
|
|
|
|
end || N <- Nodes])) == 1
|
|
|
|
end).
|
|
|
|
|
|
|
|
wait_until_connection(Node) ->
|
|
|
|
rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
|
|
|
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
|
|
|
case proplists:get_value(fullsync_coordinator, Status) of
|
|
|
|
[] ->
|
|
|
|
false;
|
|
|
|
[_C] ->
|
|
|
|
true;
|
|
|
|
Conns ->
|
|
|
|
lager:warning("multiple connections detected: ~p",
|
|
|
|
[Conns]),
|
|
|
|
true
|
|
|
|
end
|
|
|
|
end). %% 40 seconds is enough for repl
|
|
|
|
|
|
|
|
wait_until_no_connection(Node) ->
|
|
|
|
rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
|
|
|
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
|
|
|
case proplists:get_value(connected_clusters, Status) of
|
|
|
|
[] ->
|
|
|
|
true;
|
|
|
|
_ ->
|
|
|
|
false
|
|
|
|
end
|
|
|
|
end). %% 40 seconds is enough for repl
|
|
|
|
|
|
|
|
wait_for_reads(Node, Start, End, Bucket, R) ->
|
|
|
|
rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
2014-01-16 21:29:24 +00:00
|
|
|
Reads = rt:systest_read(Node, Start, End, Bucket, R),
|
|
|
|
Reads == []
|
2013-01-25 18:12:34 +00:00
|
|
|
end),
|
|
|
|
Reads = rt:systest_read(Node, Start, End, Bucket, R),
|
|
|
|
lager:info("Reads: ~p", [Reads]),
|
|
|
|
length(Reads).
|
|
|
|
|
2013-06-13 19:57:23 +00:00
|
|
|
get_fs_coord_status_item(Node, SinkName, ItemName) ->
|
|
|
|
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
|
|
|
FS_CoordProps = proplists:get_value(fullsync_coordinator, Status),
|
|
|
|
ClusterProps = proplists:get_value(SinkName, FS_CoordProps),
|
|
|
|
proplists:get_value(ItemName, ClusterProps).
|
|
|
|
|
2013-01-25 18:12:34 +00:00
|
|
|
start_and_wait_until_fullsync_complete(Node) ->
|
2013-12-30 18:14:40 +00:00
|
|
|
start_and_wait_until_fullsync_complete(Node, undefined).
|
|
|
|
|
|
|
|
start_and_wait_until_fullsync_complete(Node, Cluster) ->
|
2013-01-25 18:12:34 +00:00
|
|
|
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
|
2013-12-30 18:14:40 +00:00
|
|
|
Count0 = proplists:get_value(server_fullsyncs, Status0),
|
|
|
|
Count = case Cluster of
|
|
|
|
undefined ->
|
|
|
|
%% count the # of fullsync enabled clusters
|
|
|
|
Count0 + length(string:tokens(proplists:get_value(fullsync_enabled,
|
|
|
|
Status0), ", "));
|
|
|
|
_ ->
|
|
|
|
Count0 + 1
|
|
|
|
end,
|
2013-01-25 18:12:34 +00:00
|
|
|
lager:info("waiting for fullsync count to be ~p", [Count]),
|
|
|
|
|
|
|
|
lager:info("Starting fullsync on ~p (~p)", [Node,
|
|
|
|
rtdev:node_version(rtdev:node_id(Node))]),
|
2013-12-30 18:14:40 +00:00
|
|
|
Args = case Cluster of
|
|
|
|
undefined ->
|
|
|
|
["start"];
|
|
|
|
_ ->
|
|
|
|
["start", Cluster]
|
|
|
|
end,
|
|
|
|
rpc:call(Node, riak_repl_console, fullsync, [Args]),
|
2013-01-25 18:12:34 +00:00
|
|
|
%% sleep because of the old bug where stats will crash if you call it too
|
|
|
|
%% soon after starting a fullsync
|
|
|
|
timer:sleep(500),
|
|
|
|
|
|
|
|
Res = rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
|
|
|
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
|
|
|
case proplists:get_value(server_fullsyncs, Status) of
|
|
|
|
C when C >= Count ->
|
|
|
|
true;
|
|
|
|
_ ->
|
|
|
|
false
|
|
|
|
end
|
|
|
|
end),
|
|
|
|
?assertEqual(ok, Res),
|
|
|
|
|
|
|
|
lager:info("Fullsync on ~p complete", [Node]).
|
|
|
|
|
|
|
|
connect_cluster(Node, IP, Port) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, connect,
|
|
|
|
[[IP, integer_to_list(Port)]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
disconnect_cluster(Node, Name) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, disconnect,
|
|
|
|
[[Name]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
wait_for_connection(Node, Name) ->
|
|
|
|
rt:wait_until(Node,
|
|
|
|
fun(_) ->
|
2013-06-03 17:36:10 +00:00
|
|
|
case rpc:call(Node, riak_core_cluster_mgr,
|
|
|
|
get_connections, []) of
|
|
|
|
{ok, Connections} ->
|
|
|
|
Conn = [P || {{cluster_by_name, N}, P} <- Connections, N == Name],
|
|
|
|
case Conn of
|
|
|
|
[] ->
|
|
|
|
false;
|
|
|
|
[Pid] ->
|
|
|
|
Pid ! {self(), status},
|
|
|
|
receive
|
|
|
|
{Pid, status, _} ->
|
|
|
|
true;
|
|
|
|
{Pid, connecting, _} ->
|
|
|
|
false
|
|
|
|
end
|
|
|
|
end;
|
|
|
|
_ ->
|
|
|
|
false
|
2013-04-01 19:07:36 +00:00
|
|
|
end
|
2013-01-25 18:12:34 +00:00
|
|
|
end).
|
|
|
|
|
|
|
|
enable_realtime(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, realtime, [["enable", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
disable_realtime(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, realtime, [["disable", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
enable_fullsync(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, fullsync, [["enable", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
2014-02-04 15:42:10 +00:00
|
|
|
disable_fullsync(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, fullsync, [["disable", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
stop_fullsync(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, fullsync, [["stop", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
2013-01-25 18:12:34 +00:00
|
|
|
start_realtime(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, realtime, [["start", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
stop_realtime(Node, Cluster) ->
|
|
|
|
Res = rpc:call(Node, riak_repl_console, realtime, [["stop", Cluster]]),
|
|
|
|
?assertEqual(ok, Res).
|
|
|
|
|
|
|
|
do_write(Node, Start, End, Bucket, W) ->
|
|
|
|
case rt:systest_write(Node, Start, End, Bucket, W) of
|
|
|
|
[] ->
|
|
|
|
[];
|
|
|
|
Errors ->
|
|
|
|
lager:warning("~p errors while writing: ~p",
|
|
|
|
[length(Errors), Errors]),
|
|
|
|
timer:sleep(1000),
|
|
|
|
lists:flatten([rt:systest_write(Node, S, S, Bucket, W) ||
|
|
|
|
{S, _Error} <- Errors])
|
|
|
|
end.
|
2013-01-25 18:43:29 +00:00
|
|
|
|
|
|
|
%% does the node meet the version requirement?
|
|
|
|
node_has_version(Node, Version) ->
|
|
|
|
NodeVersion = rtdev:node_version(rtdev:node_id(Node)),
|
|
|
|
case NodeVersion of
|
|
|
|
current ->
|
|
|
|
%% current always satisfies any version check
|
|
|
|
true;
|
|
|
|
_ ->
|
|
|
|
NodeVersion >= Version
|
|
|
|
end.
|
|
|
|
|
|
|
|
nodes_with_version(Nodes, Version) ->
|
|
|
|
[Node || Node <- Nodes, node_has_version(Node, Version)].
|
|
|
|
|
|
|
|
nodes_all_have_version(Nodes, Version) ->
|
|
|
|
Nodes == nodes_with_version(Nodes, Version).
|
2013-04-19 19:57:58 +00:00
|
|
|
|
|
|
|
%% AAE support
|
2013-12-18 04:21:29 +00:00
|
|
|
wait_until_aae_trees_built(Cluster) ->
|
|
|
|
lager:info("Check if all trees built for nodes ~p", [Cluster]),
|
|
|
|
F = fun(Node) ->
|
|
|
|
Info = rpc:call(Node,
|
|
|
|
riak_kv_entropy_info,
|
|
|
|
compute_tree_info,
|
|
|
|
[]),
|
|
|
|
NotBuilt = [X || {_,undefined}=X <- Info],
|
|
|
|
NotBuilt == []
|
|
|
|
end,
|
|
|
|
[rt:wait_until(Node, F) || Node <- Cluster],
|
|
|
|
ok.
|
2013-06-13 19:57:23 +00:00
|
|
|
|
|
|
|
%% Return the number of partitions in the cluster where Node is a member.
|
|
|
|
num_partitions(Node) ->
|
|
|
|
{ok, Ring} = rpc:call(Node, riak_core_ring_manager, get_raw_ring, []),
|
|
|
|
N = riak_core_ring:num_partitions(Ring),
|
|
|
|
N.
|
2013-08-29 19:49:41 +00:00
|
|
|
|
|
|
|
get_cluster_mgr_port(Node) ->
|
|
|
|
{ok, {_Ip, Port}} = rpc:call(Node, application, get_env, [riak_core, cluster_mgr]),
|
|
|
|
Port.
|
|
|
|
|
|
|
|
maybe_reconnect_rt(SourceNode, SinkPort, SinkName) ->
|
|
|
|
case repl_util:wait_for_connection(SourceNode, SinkName) of
|
|
|
|
fail ->
|
|
|
|
connect_rt(SourceNode, SinkPort, SinkName);
|
|
|
|
Oot ->
|
|
|
|
Oot
|
|
|
|
end.
|
|
|
|
|
|
|
|
connect_rt(SourceNode, SinkPort, SinkName) ->
|
|
|
|
repl_util:connect_cluster(SourceNode, "127.0.0.1", SinkPort),
|
|
|
|
repl_util:wait_for_connection(SourceNode, SinkName),
|
|
|
|
repl_util:enable_realtime(SourceNode, SinkName),
|
|
|
|
repl_util:start_realtime(SourceNode, SinkName).
|
2014-01-16 21:29:24 +00:00
|
|
|
|
|
|
|
%% @doc Connect two clusters using a given name.
|
|
|
|
connect_cluster_by_name(Source, Port, Name) ->
|
|
|
|
lager:info("Connecting ~p to ~p for cluster ~p.",
|
|
|
|
[Source, Port, Name]),
|
|
|
|
repl_util:connect_cluster(Source, "127.0.0.1", Port),
|
|
|
|
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
|
|
|
|
|
|
|
|
%% @doc Given a node, find the port that the cluster manager is
|
|
|
|
%% listening on.
|
|
|
|
get_port(Node) ->
|
|
|
|
{ok, {_IP, Port}} = rpc:call(Node,
|
|
|
|
application,
|
|
|
|
get_env,
|
|
|
|
[riak_core, cluster_mgr]),
|
|
|
|
Port.
|
|
|
|
|
|
|
|
%% @doc Given a node, find out who the current replication leader in its
|
|
|
|
%% cluster is.
|
|
|
|
get_leader(Node) ->
|
|
|
|
rpc:call(Node, riak_core_cluster_mgr, get_leader, []).
|
|
|
|
|
|
|
|
%% @doc Validate fullsync completed and all keys are available.
|
|
|
|
validate_completed_fullsync(ReplicationLeader,
|
|
|
|
DestinationNode,
|
|
|
|
DestinationCluster,
|
|
|
|
Start,
|
|
|
|
End,
|
|
|
|
Bucket) ->
|
|
|
|
ok = check_fullsync(ReplicationLeader, DestinationCluster, 0),
|
|
|
|
lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)",
|
|
|
|
[End - Start, ReplicationLeader,
|
|
|
|
DestinationCluster, DestinationNode]),
|
|
|
|
?assertEqual(0,
|
|
|
|
repl_util:wait_for_reads(DestinationNode,
|
|
|
|
Start,
|
|
|
|
End,
|
|
|
|
Bucket,
|
|
|
|
1)).
|
|
|
|
|
|
|
|
%% @doc Write a series of keys and ensure they are all written.
|
|
|
|
write_to_cluster(Node, Start, End, Bucket) ->
|
|
|
|
lager:info("Writing ~p keys to node ~p.", [End - Start, Node]),
|
|
|
|
?assertEqual([],
|
|
|
|
repl_util:do_write(Node, Start, End, Bucket, 1)).
|
|
|
|
|
|
|
|
%% @doc Read from cluster a series of keys, asserting a certain number
|
|
|
|
%% of errors.
|
|
|
|
read_from_cluster(Node, Start, End, Bucket, Errors) ->
|
|
|
|
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
|
|
|
|
Res2 = rt:systest_read(Node, Start, End, Bucket, 1),
|
|
|
|
?assertEqual(Errors, length(Res2)).
|
|
|
|
|
|
|
|
%% @doc Assert we can perform one fullsync cycle, and that the number of
|
|
|
|
%% expected failures is correct.
|
|
|
|
check_fullsync(Node, Cluster, ExpectedFailures) ->
|
|
|
|
{Time, _} = timer:tc(repl_util,
|
|
|
|
start_and_wait_until_fullsync_complete,
|
|
|
|
[Node, Cluster]),
|
|
|
|
lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]),
|
|
|
|
|
|
|
|
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
|
|
|
|
|
|
|
Props = case proplists:get_value(fullsync_coordinator, Status) of
|
|
|
|
[{_Name, Props0}] ->
|
|
|
|
Props0;
|
|
|
|
Multiple ->
|
|
|
|
{_Name, Props0} = lists:keyfind(Cluster, 1, Multiple),
|
|
|
|
Props0
|
|
|
|
end,
|
|
|
|
|
|
|
|
%% check that the expected number of partitions failed to sync
|
|
|
|
ErrorExits = proplists:get_value(error_exits, Props),
|
|
|
|
lager:info("Error exits: ~p", [ErrorExits]),
|
|
|
|
?assertEqual(ExpectedFailures, ErrorExits),
|
|
|
|
|
|
|
|
%% check that we retried each of them 5 times
|
|
|
|
RetryExits = proplists:get_value(retry_exits, Props),
|
|
|
|
lager:info("Retry exits: ~p", [RetryExits]),
|
|
|
|
?assert(RetryExits >= ExpectedFailures * 5),
|
|
|
|
|
|
|
|
ok.
|