riak_test/tests/replication2_fsschedule.erl

284 lines
10 KiB
Erlang
Raw Normal View History

2013-02-25 17:33:51 +00:00
-module(replication2_fsschedule).
2013-02-08 16:21:52 +00:00
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
2013-02-25 20:14:02 +00:00
-import(rt, [deploy_nodes/2,
2013-02-25 20:09:29 +00:00
join/2,
2013-02-08 16:21:52 +00:00
wait_until_nodes_ready/1,
wait_until_no_pending_changes/1]).
2013-02-25 17:12:07 +00:00
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2013-02-25 20:09:29 +00:00
%% This tests fullsync scheduling in 1.2 and 1.3 Advanced Replication
2013-02-25 17:12:07 +00:00
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2013-02-25 20:09:29 +00:00
setup_repl_clusters(Conf) ->
2013-02-08 16:21:52 +00:00
NumNodes = 6,
lager:info("Deploy ~p nodes", [NumNodes]),
Nodes = deploy_nodes(NumNodes, Conf),
2013-02-25 20:09:29 +00:00
2013-02-08 16:21:52 +00:00
lager:info("Nodes = ~p", [Nodes]),
{[AFirst|_] = ANodes, Rest} = lists:split(2, Nodes),
{[BFirst|_] = BNodes, [CFirst|_] = CNodes} = lists:split(2, Rest),
2013-02-25 20:09:29 +00:00
%%AllNodes = ANodes ++ BNodes ++ CNodes,
rt:log_to_nodes(Nodes, "Starting replication2_fullsync test"),
2013-02-08 16:21:52 +00:00
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),
lager:info("CNodes: ~p", [CNodes]),
2013-02-25 20:09:29 +00:00
rt:log_to_nodes(Nodes, "Building and connecting Clusters"),
2013-02-08 16:21:52 +00:00
lager:info("Build cluster A"),
repl_util:make_cluster(ANodes),
lager:info("Build cluster B"),
repl_util:make_cluster(BNodes),
lager:info("Build cluster C"),
repl_util:make_cluster(CNodes),
repl_util:name_cluster(AFirst, "A"),
repl_util:name_cluster(BFirst, "B"),
repl_util:name_cluster(CFirst, "C"),
rt:wait_until_ring_converged(ANodes),
rt:wait_until_ring_converged(BNodes),
rt:wait_until_ring_converged(CNodes),
2013-02-25 20:09:29 +00:00
2013-04-29 21:40:59 +00:00
%% set the fullsync limits higher, so fullsyncs don't take forever
[begin
rpc:call(N, riak_repl_console, max_fssource_cluster,
[["10"]]),
rpc:call(N, riak_repl_console, max_fssource_node, [["5"]]),
rpc:call(N, riak_repl_console, max_fssink_node, [["5"]])
end || N <- [AFirst, BFirst, CFirst]],
2013-02-08 16:21:52 +00:00
%% get the leader for the first cluster
repl_util:wait_until_leader(AFirst),
LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []),
{ok, {_IP, BPort}} = rpc:call(BFirst, application, get_env,
2013-02-25 17:12:07 +00:00
[riak_core, cluster_mgr]),
2013-02-08 16:21:52 +00:00
repl_util:connect_cluster(LeaderA, "127.0.0.1", BPort),
{ok, {_IP, CPort}} = rpc:call(CFirst, application, get_env,
2013-02-25 17:12:07 +00:00
[riak_core, cluster_mgr]),
2013-02-08 16:21:52 +00:00
repl_util:connect_cluster(LeaderA, "127.0.0.1", CPort),
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
rt:wait_until_ring_converged(ANodes),
2013-02-25 17:12:07 +00:00
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "C")),
rt:wait_until_ring_converged(ANodes),
2013-02-08 16:21:52 +00:00
%% write some data on A
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
2013-02-25 17:12:07 +00:00
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
repl_util:enable_fullsync(LeaderA, "B"),
repl_util:enable_fullsync(LeaderA, "C"),
rt:wait_until_ring_converged(ANodes),
2013-02-25 20:09:29 +00:00
{LeaderA, ANodes, BNodes, CNodes, Nodes}.
test_multiple_schedules() ->
TestHash = erlang:md5(term_to_binary(os:timestamp())),
TestBucket = <<TestHash/binary, "-systest_a">>,
Conf = [
{riak_repl,
[
{fullsync_on_connect, false},
{fullsync_interval, [{"B",1}, {"C",2}]}
]}
],
{LeaderA, _ANodes, _BNodes, _CNodes, AllNodes} =
setup_repl_clusters(Conf),
rt:log_to_nodes(AllNodes, "Test multiple fullsync schedules from A -> [B,C]"),
lager:info("Writing 500 keys to ~p", [LeaderA]),
?assertEqual([], repl_util:do_write(LeaderA, 0, 500, TestBucket, 1)),
2013-02-08 16:21:52 +00:00
Status0 = rpc:call(LeaderA, riak_repl_console, status, [quiet]),
Count = proplists:get_value(server_fullsyncs, Status0),
?assertEqual(0, Count),
2013-02-25 17:12:07 +00:00
Start = riak_core_util:moment(),
2013-02-25 17:33:51 +00:00
lager:info("Note: Waiting for fullsyncs can take several minutes"),
2013-02-25 17:12:07 +00:00
wait_until_n_bnw_fullsyncs(LeaderA, "B", 3),
Finish = riak_core_util:moment(),
Diff = Finish - Start,
Minutes = Diff / 60,
%% Why 5? 1 minute for repl to B to start, 3 fullsyncs + room for slow boxes
?assert(Minutes =< 5),
2013-02-25 20:09:29 +00:00
{_AFirst, BFirst, CFirst} = get_firsts(AllNodes),
2013-02-25 17:12:07 +00:00
%% verify data is replicated to B
lager:info("Reading 500 keys written to ~p from ~p", [LeaderA, BFirst]),
?assertEqual(0, repl_util:wait_for_reads(BFirst, 0, 500, TestBucket, 2)),
2013-02-25 20:09:29 +00:00
%% verify data is replicated to C
2013-02-25 17:12:07 +00:00
lager:info("Reading 500 keys written to ~p from ~p", [LeaderA, CFirst]),
?assertEqual(0, repl_util:wait_for_reads(CFirst, 0, 500, TestBucket, 2)),
FSCountToC = get_cluster_fullsyncs(LeaderA, "C"),
%% Why 2? 1 minute for repl to C to start, 1 fullsync
?assert(FSCountToC =< 2),
2013-02-25 20:09:29 +00:00
rt:clean_cluster(AllNodes),
pass.
test_single_schedule() ->
TestHash = erlang:md5(term_to_binary(os:timestamp())),
TestBucket = <<TestHash/binary, "-systest_a">>,
Conf = [
{riak_repl,
[
{fullsync_on_connect, false},
{fullsync_interval, 1}
]}
],
{LeaderA, _ANodes, _BNodes, _CNodes, AllNodes} =
setup_repl_clusters(Conf),
rt:log_to_nodes(AllNodes, "Test single fullsync schedule from A -> [B,C]"),
lager:info("Writing 500 keys to ~p", [LeaderA]),
?assertEqual([], repl_util:do_write(LeaderA, 0, 500, TestBucket, 1)),
Status0 = rpc:call(LeaderA, riak_repl_console, status, [quiet]),
Count = proplists:get_value(server_fullsyncs, Status0),
?assertEqual(0, Count),
Start = riak_core_util:moment(),
lager:info("Note: Waiting for fullsyncs can take several minutes"),
wait_until_n_bnw_fullsyncs(LeaderA, "B", 3),
Finish = riak_core_util:moment(),
Diff = Finish - Start,
Minutes = Diff / 60,
?assert(Minutes =< 5 andalso Minutes >= 3),
{_AFirst, BFirst, CFirst} = get_firsts(AllNodes),
%% verify data is replicated to B
lager:info("Reading 500 keys written to ~p from ~p", [LeaderA, BFirst]),
?assertEqual(0, repl_util:wait_for_reads(BFirst, 0, 500, TestBucket, 2)),
%% verify data is replicated to C
lager:info("Reading 500 keys written to ~p from ~p", [LeaderA, CFirst]),
?assertEqual(0, repl_util:wait_for_reads(CFirst, 0, 500, TestBucket, 2)),
FSCountToC = get_cluster_fullsyncs(LeaderA, "C"),
%% Why 2? 1 minute for repl to C to start, 1 fullsync
?assert(FSCountToC =< 5 andalso FSCountToC >= 3),
rt:clean_cluster(AllNodes),
pass.
test_mixed_12_13() ->
TestHash = erlang:md5(term_to_binary(os:timestamp())),
TestBucket = <<TestHash/binary, "-systest_a">>,
Conf = [
{riak_repl,
[
{fullsync_on_connect, false},
{fullsync_interval, 1}
]}
],
{LeaderA, ANodes, BNodes, CNodes, AllNodes} =
setup_repl_clusters(Conf),
{AFirst, BFirst, _CFirst} = get_firsts(AllNodes),
repl_util:wait_until_leader_converge(ANodes),
repl_util:wait_until_leader_converge(BNodes),
repl_util:wait_until_leader_converge(CNodes),
lager:info("Writing 500 keys to ~p", [LeaderA]),
?assertEqual([], repl_util:do_write(LeaderA, 0, 500, TestBucket, 1)),
lager:info("Adding repl listener to cluster A"),
ListenerArgs = [[atom_to_list(LeaderA), "127.0.0.1", "9010"]],
Res = rpc:call(LeaderA, riak_repl_console, add_listener, ListenerArgs),
?assertEqual(ok, Res),
lager:info("Adding repl site to cluster B"),
SiteArgs = ["127.0.0.1", "9010", "rtmixed"],
Res = rpc:call(BFirst, riak_repl_console, add_site, [SiteArgs]),
lager:info("Waiting until scheduled fullsync occurs. Go grab a beer, this may take awhile."),
wait_until_n_bnw_fullsyncs(LeaderA, "B", 3),
wait_until_n_bnw_fullsyncs(LeaderA, "C", 3),
2013-04-29 21:40:59 +00:00
%% 1.3 fullsyncs increment the 1.2 fullsync counter, backwards
%% compatability is a terrible thing
wait_until_12_fs_complete(LeaderA, 9),
2013-02-25 20:09:29 +00:00
Status0 = rpc:call(LeaderA, riak_repl_console, status, [quiet]),
2013-04-29 21:40:59 +00:00
Count0 = proplists:get_value(server_fullsyncs, Status0),
2013-02-25 20:09:29 +00:00
FS_B = get_cluster_fullsyncs(AFirst, "B"),
FS_C = get_cluster_fullsyncs(AFirst, "C"),
2013-04-29 21:40:59 +00:00
%% count the actual 1.2 fullsyncs
Count = Count0 - (FS_B + FS_C),
2013-02-25 20:09:29 +00:00
lager:info("1.2 Count = ~p", [Count]),
lager:info("1.3 B Count = ~p", [FS_B]),
lager:info("1.3 C Count = ~p", [FS_C]),
?assert(Count >= 3 andalso Count =< 6),
?assert(FS_B >= 3 andalso FS_B =< 6),
?assert(FS_C >= 3 andalso FS_C =< 6),
2013-02-08 16:21:52 +00:00
pass.
2013-02-25 20:09:29 +00:00
confirm() ->
AllTests = [test_multiple_schedules(), test_single_schedule(), test_mixed_12_13()],
case lists:all(fun (Result) -> Result == pass end, AllTests) of
true -> pass;
false -> sadtrombone
end.
2013-04-29 21:40:59 +00:00
wait_until_12_fs_complete(Node, N) ->
2013-02-25 20:09:29 +00:00
rt:wait_until(Node,
fun(_) ->
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
case proplists:get_value(server_fullsyncs, Status) of
2013-04-29 21:40:59 +00:00
C when C >= N ->
2013-02-25 20:09:29 +00:00
true;
_ ->
false
end
end).
get_firsts(Nodes) ->
{[AFirst|_] = _ANodes, Rest} = lists:split(2, Nodes),
{[BFirst|_] = _BNodes, [CFirst|_] = _CNodes} = lists:split(2, Rest),
{AFirst, BFirst, CFirst}.
2013-02-25 17:12:07 +00:00
get_cluster_fullsyncs(Node, ClusterName) ->
Status = rpc:call(Node, riak_repl2_fscoordinator, status, []),
% let it fail if keys are missing
ClusterData = proplists:get_value(ClusterName, Status),
proplists:get_value(fullsyncs_completed, ClusterData).
2013-02-08 16:21:52 +00:00
2013-02-25 17:12:07 +00:00
wait_until_n_bnw_fullsyncs(Node, DestCluster, N) ->
2013-04-29 21:40:59 +00:00
lager:info("Waiting for fullsync count for ~p to be ~p", [DestCluster, N]),
2013-02-08 16:21:52 +00:00
Res = rt:wait_until(Node,
fun(_) ->
2013-02-25 17:12:07 +00:00
Fullsyncs = get_cluster_fullsyncs(Node, DestCluster),
case Fullsyncs of
2013-02-08 16:21:52 +00:00
C when C >= N ->
true;
2013-02-25 17:33:51 +00:00
_Other ->
%% keep this in for tracing
%%lager:info("Total fullsyncs = ~p", [Other]),
2013-04-29 21:40:59 +00:00
%% sleep a while so the default 3 minute time out
%% doesn't screw us
timer:sleep(20000),
2013-02-08 16:21:52 +00:00
false
end
end),
?assertEqual(ok, Res),
lager:info("Fullsync on ~p complete", [Node]).