riak_test/tests/repl_rt_cascading_rtq.erl
2014-02-26 12:16:28 -07:00

186 lines
6.9 KiB
Erlang

-module(repl_rt_cascading_rtq).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(TEST_BUCKET, <<"rt-cascading-rtq-systest-a">>).
setup() ->
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
{SourceLeader, SinkLeaderA, SinkLeaderB, _, _, _} = ClusterNodes = make_clusters(),
connect_clusters(SourceLeader, SinkLeaderA, "SinkA"),
connect_clusters(SourceLeader, SinkLeaderB, "SinkB"),
ClusterNodes.
confirm() ->
SetupData = setup(),
rtq_data_buildup_test(SetupData),
pass.
%% This test case is designed to ensure that there is no realtime
%% queue buildup on sink nodes that do not serve as source nodes for
%% any other clusters. It constructs a simple toplogy with a single
%% source cluster replicating to two sinks. The toplogy for this test
%% is as follows:
%% +--------+
%% | Source |
%% +--------+
%% ^ ^
%% / \
%% V V
%% +-------+ +-------+
%% | SinkA | | SinkB |
%% +-------+ +-------+
rtq_data_buildup_test(ClusterNodes) ->
{SourceLeader, SinkLeaderA, SinkLeaderB, SourceNodes, _SinkANodes, _SinkBNodes} = ClusterNodes,
%% Enable RT replication from source cluster "SinkA"
lager:info("Enabling realtime between ~p and ~p", [SourceLeader, SinkLeaderB]),
enable_rt(SourceLeader, SourceNodes, "SinkA"),
%% Enable RT replication from source cluster "SinkB"
lager:info("Enabling realtime between ~p and ~p", [SourceLeader, SinkLeaderA]),
enable_rt(SourceLeader, SourceNodes, "SinkB"),
%% Get the baseline byte count for the rtq for each sink cluster
SinkAInitialQueueSize = rtq_bytes(SinkLeaderA),
SinkBInitialQueueSize = rtq_bytes(SinkLeaderB),
%% Write keys to source cluster A
KeyCount = 1001,
write_to_cluster(SourceLeader, 1, KeyCount),
read_from_cluster(SinkLeaderA, 1, KeyCount, 0),
read_from_cluster(SinkLeaderB, 1, KeyCount, 0),
%% Verify the rt queue is still at the initial size for both sink clusters
?assertEqual(SinkAInitialQueueSize, rtq_bytes(SinkLeaderA)),
?assertEqual(SinkBInitialQueueSize, rtq_bytes(SinkLeaderB)).
rtq_bytes(Node) ->
RtqStatus = rpc:call(Node, riak_repl2_rtq, status, []),
proplists:get_value(bytes, RtqStatus).
make_clusters() ->
NodeCount = rt_config:get(num_nodes, 6),
lager:info("Deploy ~p nodes", [NodeCount]),
Nodes = deploy_nodes(NodeCount, true),
{SourceNodes, SinkNodes} = lists:split(2, Nodes),
{SinkANodes, SinkBNodes} = lists:split(2, SinkNodes),
lager:info("SinkANodes: ~p", [SinkANodes]),
lager:info("SinkBNodes: ~p", [SinkBNodes]),
lager:info("Build source cluster"),
repl_util:make_cluster(SourceNodes),
lager:info("Build sink cluster A"),
repl_util:make_cluster(SinkANodes),
lager:info("Build sink cluster B"),
repl_util:make_cluster(SinkBNodes),
SourceFirst = hd(SourceNodes),
AFirst = hd(SinkANodes),
BFirst = hd(SinkBNodes),
%% Name the clusters
repl_util:name_cluster(SourceFirst, "Source"),
repl_util:name_cluster(AFirst, "SinkA"),
repl_util:name_cluster(BFirst, "SinkB"),
lager:info("Waiting for convergence."),
rt:wait_until_ring_converged(SourceNodes),
rt:wait_until_ring_converged(SinkANodes),
rt:wait_until_ring_converged(SinkBNodes),
lager:info("Waiting for transfers to complete."),
rt:wait_until_transfers_complete(SourceNodes),
rt:wait_until_transfers_complete(SinkANodes),
rt:wait_until_transfers_complete(SinkBNodes),
%% get the leader for the source cluster
lager:info("waiting for leader to converge on the source cluster"),
?assertEqual(ok, repl_util:wait_until_leader_converge(SourceNodes)),
%% get the leader for the first sink cluster
lager:info("waiting for leader to converge on sink cluster A"),
?assertEqual(ok, repl_util:wait_until_leader_converge(SinkANodes)),
%% get the leader for the second cluster
lager:info("waiting for leader to converge on cluster B"),
?assertEqual(ok, repl_util:wait_until_leader_converge(SinkBNodes)),
SourceLeader = repl_util:get_leader(SourceFirst),
ALeader = repl_util:get_leader(AFirst),
BLeader = repl_util:get_leader(BFirst),
%% Uncomment the following 2 lines to verify that pre-2.0 versions
%% of Riak behave as expected if cascading writes are disabled for
%% the sink clusters.
%% disable_cascading(ALeader, SinkANodes),
%% disable_cascading(BLeader, SinkBNodes),
lager:info("Source Leader: ~p SinkALeader: ~p SinkBLeader: ~p", [SourceLeader, ALeader, BLeader]),
{SourceLeader, ALeader, BLeader, SourceNodes, SinkANodes, SinkBNodes}.
%% @doc Connect two clusters using a given name.
connect_cluster(Source, Port, Name) ->
lager:info("Connecting ~p to ~p for cluster ~p.",
[Source, Port, Name]),
repl_util:connect_cluster(Source, "127.0.0.1", Port),
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
%% @doc Connect two clusters for replication using their respective leader nodes.
connect_clusters(SourceLeader, SinkLeader, SinkName) ->
SinkPort = repl_util:get_port(SinkLeader),
lager:info("connect source cluster to ~p on port ~p", [SinkName, SinkPort]),
repl_util:connect_cluster(SourceLeader, "127.0.0.1", SinkPort),
?assertEqual(ok, repl_util:wait_for_connection(SourceLeader, SinkName)).
cluster_conf(_CascadingWrites) ->
[
{riak_repl,
[
%% turn off fullsync
{fullsync_on_connect, false},
{fullsync_interval, disabled},
{max_fssource_cluster, 20},
{max_fssource_node, 20},
{max_fssink_node, 20},
{rtq_max_bytes, 1048576}
]}
].
deploy_nodes(NumNodes, true) ->
rt:deploy_nodes(NumNodes, cluster_conf(always));
deploy_nodes(NumNodes, false) ->
rt:deploy_nodes(NumNodes, cluster_conf(never)).
%% @doc Turn on Realtime replication on the cluster lead by LeaderA.
%% The clusters must already have been named and connected.
enable_rt(SourceLeader, SourceNodes, SinkName) ->
repl_util:enable_realtime(SourceLeader, SinkName),
rt:wait_until_ring_converged(SourceNodes),
repl_util:start_realtime(SourceLeader, SinkName),
rt:wait_until_ring_converged(SourceNodes).
%% @doc Turn off Realtime replication on the cluster lead by LeaderA.
disable_cascading(Leader, Nodes) ->
rpc:call(Leader, riak_repl_console, realtime_cascades, [["never"]]),
rt:wait_until_ring_converged(Nodes).
%% @doc Write a series of keys and ensure they are all written.
write_to_cluster(Node, Start, End) ->
lager:info("Writing ~p keys to node ~p.", [End - Start, Node]),
?assertEqual([],
repl_util:do_write(Node, Start, End, ?TEST_BUCKET, 1)).
%% @doc Read from cluster a series of keys, asserting a certain number
%% of errors.
read_from_cluster(Node, Start, End, Errors) ->
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
Res2 = rt:systest_read(Node, Start, End, ?TEST_BUCKET, 1),
?assertEqual(Errors, length(Res2)).