Ensure riak_repl service is up across the board

Trying to use the repl features before newly started nodes have
riak_repl completely initialized leads to all sorts of nasty crashes and
noise. Frequently it makes fullsync stuck forever, which makes a lot of
the tests fail.

This also tweaks the AAE fullsync tests to remove assumptions about
failure stats when AAE transient errors occur. The behavior in the
handling of those errors has changed recently with the introduction of
soft exits.
This commit is contained in:
Engel A. Sanchez 2014-12-18 16:07:00 -05:00
parent 5a6150ab14
commit 556cb7210c
30 changed files with 67 additions and 66 deletions

View File

@ -56,6 +56,7 @@
create_and_activate_bucket_type/3,
deploy_nodes/1,
deploy_nodes/2,
deploy_nodes/3,
deploy_clusters/1,
down/2,
enable_search_hook/2,
@ -305,8 +306,7 @@ deploy_nodes(NumNodes) when is_integer(NumNodes) ->
%% `InitialConfig', returning a list of the nodes deployed.
-spec deploy_nodes(NumNodes :: integer(), any()) -> [node()].
deploy_nodes(NumNodes, InitialConfig) when is_integer(NumNodes) ->
NodeConfig = [{current, InitialConfig} || _ <- lists:seq(1,NumNodes)],
deploy_nodes(NodeConfig);
deploy_nodes(NumNodes, InitialConfig, [riak_kv]);
deploy_nodes(Versions, Services) ->
NodeConfig = [ version_to_config(Version) || Version <- Versions ],
Nodes = ?HARNESS:deploy_nodes(NodeConfig),
@ -315,6 +315,10 @@ deploy_nodes(Versions, Services) ->
Service <- Services ],
Nodes.
deploy_nodes(NumNodes, InitialConfig, Services) when is_integer(NumNodes) ->
NodeConfig = [{current, InitialConfig} || _ <- lists:seq(1,NumNodes)],
deploy_nodes(NodeConfig, Services).
version_to_config(Config) when is_tuple(Config)-> Config;
version_to_config(Version) -> {Version, default}.

View File

@ -9,8 +9,6 @@
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2]).
-define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>).
-define(NUM_KEYS, 1000).
@ -34,6 +32,7 @@
{fullsync_strategy, aae},
{fullsync_on_connect, false},
{fullsync_interval, disabled},
{max_fssource_soft_retries, 10},
{max_fssource_retries, Retries}
]}
]).
@ -48,7 +47,7 @@ confirm() ->
simple_test() ->
%% Deploy 6 nodes.
Nodes = deploy_nodes(6, ?CONF(5)),
Nodes = rt:deploy_nodes(6, ?CONF(5), [riak_kv, riak_repl]),
%% Break up the 6 nodes into three clustes.
{ANodes, BNodes} = lists:split(3, Nodes),
@ -120,7 +119,7 @@ simple_test() ->
dual_test() ->
%% Deploy 6 nodes.
Nodes = deploy_nodes(6, ?CONF(infinity)),
Nodes = rt:deploy_nodes(6, ?CONF(infinity), [riak_kv, riak_repl]),
%% Break up the 6 nodes into three clustes.
{ANodes, Rest} = lists:split(2, Nodes),
@ -220,7 +219,7 @@ dual_test() ->
bidirectional_test() ->
%% Deploy 6 nodes.
Nodes = deploy_nodes(6, ?CONF(5)),
Nodes = rt:deploy_nodes(6, ?CONF(5), [riak_kv, riak_repl]),
%% Break up the 6 nodes into three clustes.
{ANodes, BNodes} = lists:split(3, Nodes),
@ -303,7 +302,7 @@ bidirectional_test() ->
difference_test() ->
%% Deploy 6 nodes.
Nodes = deploy_nodes(6, ?CONF(5)),
Nodes = rt:deploy_nodes(6, ?CONF(5), [riak_kv, riak_repl]),
%% Break up the 6 nodes into three clustes.
{ANodes, BNodes} = lists:split(3, Nodes),
@ -395,7 +394,7 @@ difference_test() ->
deadlock_test() ->
%% Deploy 6 nodes.
Nodes = deploy_nodes(6, ?CONF(5)),
Nodes = rt:deploy_nodes(6, ?CONF(5), [riak_kv, riak_repl]),
%% Break up the 6 nodes into three clustes.
{ANodes, BNodes} = lists:split(3, Nodes),
@ -507,10 +506,6 @@ check_fullsync(Node, Cluster, ExpectedFailures) ->
?assertEqual(ExpectedFailures,
proplists:get_value(error_exits, Props)),
%% check that we retried each of them 5 times
?assert(
proplists:get_value(retry_exits, Props) >= ExpectedFailures * 5),
ok.
%% @doc Validate fullsync handles errors for all possible intercept

View File

@ -6,18 +6,16 @@
prepare_cluster_data/5]).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
-import(rt, [deploy_nodes/3,
join/2,
log_to_nodes/2,
log_to_nodes/3,
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
log_to_nodes/3]).
make_clusters(NumNodesWanted, ClusterSize, Conf) ->
NumNodes = rt_config:get(num_nodes, NumNodesWanted),
ClusterASize = rt_config:get(cluster_a_size, ClusterSize),
lager:info("Deploy ~p nodes", [NumNodes]),
Nodes = deploy_nodes(NumNodes, Conf),
Nodes = deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
lager:info("ANodes: ~p", [ANodes]),

View File

@ -347,10 +347,10 @@ cluster_conf() ->
].
deploy_nodes(NumNodes, current) ->
rt:deploy_nodes(NumNodes, cluster_conf());
rt:deploy_nodes(NumNodes, cluster_conf(), [riak_kv, riak_repl]);
deploy_nodes(_, mixed) ->
Conf = cluster_conf(),
rt:deploy_nodes([{current, Conf}, {previous, Conf}]).
rt:deploy_nodes([{current, Conf}, {previous, Conf}], [riak_kv, riak_repl]).
%% @doc Create two clusters of 1 node each and connect them for replication:
%% Cluster "A" -> cluster "B"

View File

@ -37,6 +37,9 @@ confirm() ->
Nodes = [ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),

View File

@ -103,7 +103,7 @@ make_clusters() ->
]}
],
Nodes = rt:deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),

View File

@ -64,6 +64,9 @@ fullsync_test(Strategy, Latency) ->
[ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
AFirst = hd(ANodes),
BFirst = hd(BNodes),

View File

@ -38,7 +38,7 @@ setup() ->
NodeCount = rt_config:get(num_nodes, 6),
lager:info("Deploy ~p nodes", [NodeCount]),
Nodes = rt:deploy_nodes(NodeCount, cluster_conf()),
Nodes = rt:deploy_nodes(NodeCount, cluster_conf(), [riak_kv, riak_repl]),
SplitSize = NodeCount div 2,
{SourceNodes, SinkNodes} = lists:split(SplitSize, Nodes),

View File

@ -38,6 +38,9 @@ confirm() ->
[ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),

View File

@ -19,7 +19,7 @@ confirm() ->
toggle_enabled_test_() ->
{setup, fun() ->
Nodes = rt:deploy_nodes(3, conf()),
Nodes = rt:deploy_nodes(3, conf(), [riak_kv, riak_repl]),
repl_util:make_cluster(Nodes),
Nodes
end,
@ -78,7 +78,7 @@ data_push() ->
data_push_test_() ->
{timeout, rt_cascading:timeout(1000000000000000), {setup, fun() ->
Nodes = rt:deploy_nodes(6, conf()),
Nodes = rt:deploy_nodes(6, conf(), [riak_kv, riak_repl]),
{[N1 | _] = C123, [N4 | _] = C456} = lists:split(3, Nodes),
repl_util:make_cluster(C123),
repl_util:name_cluster(N1, "c123"),
@ -211,7 +211,7 @@ read_repair_interaction() ->
read_repair_interaction_test_() ->
{timeout, rt_cascading:timeout(100000), {setup, fun() ->
Nodes = rt:deploy_nodes(6, conf()),
Nodes = rt:deploy_nodes(6, conf(), [riak_kv, riak_repl]),
{[N1 | _] = C123, [N4 | _] = C456} = lists:split(3, Nodes),
repl_util:make_cluster(C123),
repl_util:name_cluster(N1, "c123"),

View File

@ -153,9 +153,9 @@ cluster_conf(_CascadingWrites) ->
].
deploy_nodes(NumNodes, true) ->
rt:deploy_nodes(NumNodes, cluster_conf(always));
rt:deploy_nodes(NumNodes, cluster_conf(always), [riak_kv, riak_repl]);
deploy_nodes(NumNodes, false) ->
rt:deploy_nodes(NumNodes, cluster_conf(never)).
rt:deploy_nodes(NumNodes, cluster_conf(never), [riak_kv, riak_repl]).
%% @doc Turn on Realtime replication on the cluster lead by LeaderA.
%% The clusters must already have been named and connected.

View File

@ -174,7 +174,7 @@ make_connected_clusters() ->
]}
],
Nodes = rt:deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
lager:info("ANodes: ~p", [ANodes]),

View File

@ -128,7 +128,7 @@ make_connected_clusters() ->
]}
],
Nodes = rt:deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),

View File

@ -174,7 +174,7 @@ make_connected_clusters() ->
]}
],
Nodes = rt:deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
lager:info("ANodes: ~p", [ANodes]),

View File

@ -4,11 +4,6 @@
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
join/2,
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
%% export functions shared with other replication tests...
-export([make_bucket/3]).
@ -23,6 +18,8 @@ confirm() ->
],
rt:set_advanced_conf(all, Conf),
[ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
replication(ANodes, BNodes, false),
pass.
@ -204,7 +201,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
lager:info("Restarting down node ~p", [LeaderA]),
rt:start(LeaderA),
rt:wait_until_pingable(LeaderA),
wait_until_no_pending_changes(ANodes),
rt:wait_until_no_pending_changes(ANodes),
wait_until_leader_converge(ANodes),
start_and_wait_until_fullsync_complete(LeaderA2),

View File

@ -3,8 +3,7 @@
-export([confirm/0, replication/3]).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
join/2,
-import(rt, [join/2,
log_to_nodes/2,
log_to_nodes/3,
wait_until_nodes_ready/1,
@ -33,8 +32,7 @@ confirm() ->
]}
],
Nodes = deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
lager:info("ANodes: ~p", [ANodes]),
@ -252,6 +250,7 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
lager:info("Restarting down node ~p", [LeaderA]),
rt:start(LeaderA),
rt:wait_until_pingable(LeaderA),
rt:wait_for_service(LeaderA, [riak_kv, riak_repl]),
repl_util:start_and_wait_until_fullsync_complete(LeaderA2),
log_to_nodes(AllNodes, "Starting Joe's Repl Test"),

View File

@ -48,6 +48,9 @@ simple_test() ->
[ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),

View File

@ -48,7 +48,7 @@
confirm() ->
%% Deploy a node to test against
lager:info("Deploy node to test riak-repl command line"),
[Node] = rt:deploy_nodes(1),
[Node] = rt:deploy_nodes(1, [], [riak_kv, riak_repl]),
?assertEqual(ok, rt:wait_until_nodes_ready([Node])),
rt_intercept:add(Node,
{riak_repl_console,

View File

@ -2,11 +2,6 @@
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
join/2,
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
confirm() ->
TestHash = erlang:md5(term_to_binary(os:timestamp())),
TestBucket = <<TestHash/binary, "-systest_a">>,
@ -22,7 +17,7 @@ confirm() ->
]}
],
Nodes = deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{[AFirst|_] = ANodes, [BFirst|_] = BNodes} = lists:split(ClusterASize, Nodes),
AllNodes = ANodes ++ BNodes,

View File

@ -2,11 +2,6 @@
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
join/2,
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% This tests fullsync scheduling in 1.4+ Advanced Replication%% intercept
@ -17,7 +12,7 @@ setup_repl_clusters(Conf, InterceptSetup) ->
NumNodes = 6,
lager:info("Deploy ~p nodes", [NumNodes]),
Nodes = deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
InterceptSetup(Nodes),
lager:info("Nodes = ~p", [Nodes]),

View File

@ -3,10 +3,6 @@
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-import(rt, [deploy_nodes/2,
join/2,
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -71,6 +67,10 @@ setup_repl_clusters(Conf, SSL) ->
rt:set_advanced_conf(all, Conf),
Nodes = [ANodes, BNodes, CNodes] = rt:build_clusters([2, 2, 2]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
rt:wait_for_cluster_service(CNodes, riak_repl),
AFirst = hd(ANodes),
BFirst = hd(BNodes),
CFirst = hd(CNodes),

View File

@ -43,7 +43,7 @@ confirm() ->
]}
],
Nodes = rt:deploy_nodes(NumNodes, Conf),
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
{ANodes, Rest} = lists:split(2, Nodes),
{BNodes, CNodes} = lists:split(2, Rest),

View File

@ -190,7 +190,7 @@ confirm() ->
lager:info("===testing basic connectivity"),
[Node1, Node2] = rt:deploy_nodes(2, BaseConf),
[Node1, Node2] = rt:deploy_nodes(2, BaseConf, [riak_kv, riak_repl]),
repl_util:name_cluster(Node1, "A"),
repl_util:name_cluster(Node2, "B"),
@ -264,7 +264,7 @@ confirm() ->
lager:info("Re-deploying 6 nodes"),
Nodes = rt:deploy_nodes(6, BaseConf),
Nodes = rt:deploy_nodes(6, BaseConf, [riak_kv, riak_repl]),
[rt:wait_until_pingable(N) || N <- Nodes],

View File

@ -32,7 +32,7 @@ confirm() ->
NodeConfig = [{FromVersion, Conf} || _ <- lists:seq(1, NumNodes)],
Nodes = rt:deploy_nodes(NodeConfig),
Nodes = rt:deploy_nodes(NodeConfig, [riak_kv, riak_repl]),
NodeUpgrades = case UpgradeOrder of
"forwards" ->

View File

@ -168,6 +168,9 @@ configure_clusters(AVersion, BVersion, Realtime) ->
Nodes = [ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),

View File

@ -163,7 +163,7 @@ confirm() ->
lager:info("===testing basic connectivity"),
[Node1, Node2] = rt:deploy_nodes(2, BaseConf),
[Node1, Node2] = rt:deploy_nodes(2, BaseConf, [riak_kv, riak_repl]),
Listeners = replication:add_listeners([Node1]),
replication:verify_listeners(Listeners),
@ -225,7 +225,7 @@ confirm() ->
lager:info("Re-deploying 6 nodes"),
Nodes = rt:deploy_nodes(6, BaseConf),
Nodes = rt:deploy_nodes(6, BaseConf, [riak_kv, riak_repl]),
[rt:wait_until_pingable(N) || N <- Nodes],

View File

@ -43,6 +43,9 @@ fullsync_enabled_and_started() ->
[ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
AFirst = hd(ANodes),
BFirst = hd(BNodes),

View File

@ -25,7 +25,7 @@ confirm() ->
NodeConfig = [{FromVersion, Conf} || _ <- lists:seq(1, NumNodes)],
Nodes = rt:deploy_nodes(NodeConfig),
Nodes = rt:deploy_nodes(NodeConfig, [riak_kv, riak_repl]),
NodeUpgrades = case UpgradeOrder of
"forwards" ->

View File

@ -63,7 +63,7 @@ make_clusters() ->
Conf = [{riak_repl, [{fullsync_on_connect, false},
{fullsync_interval, disabled}]},
{riak_core, [{default_bucket_props, [{allow_mult, true}]}]}],
Nodes = rt:deploy_nodes(6, Conf),
Nodes = rt:deploy_nodes(6, Conf, [riak_kv, riak_repl]),
{ClusterA, ClusterB} = lists:split(3, Nodes),
A = make_cluster(ClusterA, "A"),
B = make_cluster(ClusterB, "B"),

View File

@ -92,7 +92,7 @@ make_clusters() ->
{riak_core, [{default_bucket_props,
[{dvv_enabled, true},
{allow_mult, true}]}]}],
Nodes = rt:deploy_nodes(6, Conf),
Nodes = rt:deploy_nodes(6, Conf, [riak_kv, riak_repl]),
{ClusterA, ClusterB} = lists:split(3, Nodes),
A = make_cluster(ClusterA, "A"),
B = make_cluster(ClusterB, "B"),