mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 16:45:29 +00:00
556cb7210c
Trying to use the repl features before newly started nodes have riak_repl completely initialized leads to all sorts of nasty crashes and noise. Frequently it makes fullsync stuck forever, which makes a lot of the tests fail. This also tweaks the AAE fullsync tests to remove assumptions about failure stats when AAE transient errors occur. The behavior in the handling of those errors has changed recently with the introduction of soft exits.
186 lines
6.9 KiB
Erlang
186 lines
6.9 KiB
Erlang
-module(repl_rt_cascading_rtq).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-define(TEST_BUCKET, <<"rt-cascading-rtq-systest-a">>).
|
|
|
|
setup() ->
|
|
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
|
|
|
|
{SourceLeader, SinkLeaderA, SinkLeaderB, _, _, _} = ClusterNodes = make_clusters(),
|
|
|
|
connect_clusters(SourceLeader, SinkLeaderA, "SinkA"),
|
|
connect_clusters(SourceLeader, SinkLeaderB, "SinkB"),
|
|
ClusterNodes.
|
|
|
|
confirm() ->
|
|
SetupData = setup(),
|
|
rtq_data_buildup_test(SetupData),
|
|
pass.
|
|
|
|
%% This test case is designed to ensure that there is no realtime
|
|
%% queue buildup on sink nodes that do not serve as source nodes for
|
|
%% any other clusters. It constructs a simple toplogy with a single
|
|
%% source cluster replicating to two sinks. The toplogy for this test
|
|
%% is as follows:
|
|
%% +--------+
|
|
%% | Source |
|
|
%% +--------+
|
|
%% ^ ^
|
|
%% / \
|
|
%% V V
|
|
%% +-------+ +-------+
|
|
%% | SinkA | | SinkB |
|
|
%% +-------+ +-------+
|
|
rtq_data_buildup_test(ClusterNodes) ->
|
|
{SourceLeader, SinkLeaderA, SinkLeaderB, SourceNodes, _SinkANodes, _SinkBNodes} = ClusterNodes,
|
|
|
|
%% Enable RT replication from source cluster "SinkA"
|
|
lager:info("Enabling realtime between ~p and ~p", [SourceLeader, SinkLeaderB]),
|
|
enable_rt(SourceLeader, SourceNodes, "SinkA"),
|
|
%% Enable RT replication from source cluster "SinkB"
|
|
lager:info("Enabling realtime between ~p and ~p", [SourceLeader, SinkLeaderA]),
|
|
enable_rt(SourceLeader, SourceNodes, "SinkB"),
|
|
|
|
%% Get the baseline byte count for the rtq for each sink cluster
|
|
SinkAInitialQueueSize = rtq_bytes(SinkLeaderA),
|
|
SinkBInitialQueueSize = rtq_bytes(SinkLeaderB),
|
|
|
|
%% Write keys to source cluster A
|
|
KeyCount = 1001,
|
|
write_to_cluster(SourceLeader, 1, KeyCount),
|
|
read_from_cluster(SinkLeaderA, 1, KeyCount, 0),
|
|
read_from_cluster(SinkLeaderB, 1, KeyCount, 0),
|
|
|
|
%% Verify the rt queue is still at the initial size for both sink clusters
|
|
?assertEqual(SinkAInitialQueueSize, rtq_bytes(SinkLeaderA)),
|
|
?assertEqual(SinkBInitialQueueSize, rtq_bytes(SinkLeaderB)).
|
|
|
|
rtq_bytes(Node) ->
|
|
RtqStatus = rpc:call(Node, riak_repl2_rtq, status, []),
|
|
proplists:get_value(bytes, RtqStatus).
|
|
|
|
make_clusters() ->
|
|
NodeCount = rt_config:get(num_nodes, 6),
|
|
lager:info("Deploy ~p nodes", [NodeCount]),
|
|
Nodes = deploy_nodes(NodeCount, true),
|
|
|
|
{SourceNodes, SinkNodes} = lists:split(2, Nodes),
|
|
{SinkANodes, SinkBNodes} = lists:split(2, SinkNodes),
|
|
lager:info("SinkANodes: ~p", [SinkANodes]),
|
|
lager:info("SinkBNodes: ~p", [SinkBNodes]),
|
|
|
|
lager:info("Build source cluster"),
|
|
repl_util:make_cluster(SourceNodes),
|
|
|
|
lager:info("Build sink cluster A"),
|
|
repl_util:make_cluster(SinkANodes),
|
|
|
|
lager:info("Build sink cluster B"),
|
|
repl_util:make_cluster(SinkBNodes),
|
|
|
|
SourceFirst = hd(SourceNodes),
|
|
AFirst = hd(SinkANodes),
|
|
BFirst = hd(SinkBNodes),
|
|
|
|
%% Name the clusters
|
|
repl_util:name_cluster(SourceFirst, "Source"),
|
|
repl_util:name_cluster(AFirst, "SinkA"),
|
|
repl_util:name_cluster(BFirst, "SinkB"),
|
|
|
|
lager:info("Waiting for convergence."),
|
|
rt:wait_until_ring_converged(SourceNodes),
|
|
rt:wait_until_ring_converged(SinkANodes),
|
|
rt:wait_until_ring_converged(SinkBNodes),
|
|
|
|
lager:info("Waiting for transfers to complete."),
|
|
rt:wait_until_transfers_complete(SourceNodes),
|
|
rt:wait_until_transfers_complete(SinkANodes),
|
|
rt:wait_until_transfers_complete(SinkBNodes),
|
|
|
|
%% get the leader for the source cluster
|
|
lager:info("waiting for leader to converge on the source cluster"),
|
|
?assertEqual(ok, repl_util:wait_until_leader_converge(SourceNodes)),
|
|
|
|
%% get the leader for the first sink cluster
|
|
lager:info("waiting for leader to converge on sink cluster A"),
|
|
?assertEqual(ok, repl_util:wait_until_leader_converge(SinkANodes)),
|
|
|
|
%% get the leader for the second cluster
|
|
lager:info("waiting for leader to converge on cluster B"),
|
|
?assertEqual(ok, repl_util:wait_until_leader_converge(SinkBNodes)),
|
|
|
|
SourceLeader = repl_util:get_leader(SourceFirst),
|
|
ALeader = repl_util:get_leader(AFirst),
|
|
BLeader = repl_util:get_leader(BFirst),
|
|
|
|
%% Uncomment the following 2 lines to verify that pre-2.0 versions
|
|
%% of Riak behave as expected if cascading writes are disabled for
|
|
%% the sink clusters.
|
|
%% disable_cascading(ALeader, SinkANodes),
|
|
%% disable_cascading(BLeader, SinkBNodes),
|
|
|
|
lager:info("Source Leader: ~p SinkALeader: ~p SinkBLeader: ~p", [SourceLeader, ALeader, BLeader]),
|
|
{SourceLeader, ALeader, BLeader, SourceNodes, SinkANodes, SinkBNodes}.
|
|
|
|
%% @doc Connect two clusters using a given name.
|
|
connect_cluster(Source, Port, Name) ->
|
|
lager:info("Connecting ~p to ~p for cluster ~p.",
|
|
[Source, Port, Name]),
|
|
repl_util:connect_cluster(Source, "127.0.0.1", Port),
|
|
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
|
|
|
|
%% @doc Connect two clusters for replication using their respective leader nodes.
|
|
connect_clusters(SourceLeader, SinkLeader, SinkName) ->
|
|
SinkPort = repl_util:get_port(SinkLeader),
|
|
lager:info("connect source cluster to ~p on port ~p", [SinkName, SinkPort]),
|
|
repl_util:connect_cluster(SourceLeader, "127.0.0.1", SinkPort),
|
|
?assertEqual(ok, repl_util:wait_for_connection(SourceLeader, SinkName)).
|
|
|
|
cluster_conf(_CascadingWrites) ->
|
|
[
|
|
{riak_repl,
|
|
[
|
|
%% turn off fullsync
|
|
{fullsync_on_connect, false},
|
|
{fullsync_interval, disabled},
|
|
{max_fssource_cluster, 20},
|
|
{max_fssource_node, 20},
|
|
{max_fssink_node, 20},
|
|
{rtq_max_bytes, 1048576}
|
|
]}
|
|
].
|
|
|
|
deploy_nodes(NumNodes, true) ->
|
|
rt:deploy_nodes(NumNodes, cluster_conf(always), [riak_kv, riak_repl]);
|
|
deploy_nodes(NumNodes, false) ->
|
|
rt:deploy_nodes(NumNodes, cluster_conf(never), [riak_kv, riak_repl]).
|
|
|
|
%% @doc Turn on Realtime replication on the cluster lead by LeaderA.
|
|
%% The clusters must already have been named and connected.
|
|
enable_rt(SourceLeader, SourceNodes, SinkName) ->
|
|
repl_util:enable_realtime(SourceLeader, SinkName),
|
|
rt:wait_until_ring_converged(SourceNodes),
|
|
|
|
repl_util:start_realtime(SourceLeader, SinkName),
|
|
rt:wait_until_ring_converged(SourceNodes).
|
|
|
|
%% @doc Turn off Realtime replication on the cluster lead by LeaderA.
|
|
disable_cascading(Leader, Nodes) ->
|
|
rpc:call(Leader, riak_repl_console, realtime_cascades, [["never"]]),
|
|
rt:wait_until_ring_converged(Nodes).
|
|
|
|
%% @doc Write a series of keys and ensure they are all written.
|
|
write_to_cluster(Node, Start, End) ->
|
|
lager:info("Writing ~p keys to node ~p.", [End - Start, Node]),
|
|
?assertEqual([],
|
|
repl_util:do_write(Node, Start, End, ?TEST_BUCKET, 1)).
|
|
|
|
%% @doc Read from cluster a series of keys, asserting a certain number
|
|
%% of errors.
|
|
read_from_cluster(Node, Start, End, Errors) ->
|
|
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
|
|
Res2 = rt:systest_read(Node, Start, End, ?TEST_BUCKET, 1),
|
|
?assertEqual(Errors, length(Res2)).
|