diff --git a/tests/repl_rt_pending.erl b/tests/repl_rt_pending.erl new file mode 100644 index 00000000..2c3c6346 --- /dev/null +++ b/tests/repl_rt_pending.erl @@ -0,0 +1,249 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. +%% +%% ------------------------------------------------------------------- +-module(repl_rt_pending). +-behaviour(riak_test). +-export([confirm/0]). +-include_lib("eunit/include/eunit.hrl"). + +-define(RPC_TIMEOUT, 5000). +-define(HB_TIMEOUT, 2000). + +%% Replication Realtime Heartbeat test +%% Valid for EE version 1.3.2 and up +%% +%% If both sides of an RT replication connection support it, a heartbeat +%% message is sent from the RT Source to the RT Sink every +%% {riak_repl, rt_heartbeat_interval} which default to 15s. If +%% a response is not received in {riak_repl, rt_heartbeat_timeout}, also +%% default to 15s then the source connection exits and will be re-established +%% by the supervisor. +%% +%% RT Heartbeat messages are supported between EE releases 1.3.2 and up. +%% +%% Test: +%% ----- +%% Change the heartbeat_interval and heartbeat_timeout to 2 seconds, +%% Start up two >1.3.2 clusters and connect them, +%% Enable RT replication, +%% Write some objects to the source cluster (A), +%% Verify they got to the sink cluster (B), +%% Verify that heartbeats are being acknowledged by the sink (B) back to source (A), +%% Interupt the connection so that packets can not flow from A -> B, +%% Verify that the connection is restarted after the heartbeat_timeout period, +%% Verify that heartbeats are being acknowledged by the sink (B) back to source (A), +%% Write some objects to the source cluster (A), +%% Verify they got to the sink cluster (B), +%% Have a cold beverage. + +%% @doc riak_test entry point +confirm() -> + %% Start up two >1.3.2 clusters and connect them, + {LeaderA, LeaderB, ANodes, BNodes} = make_connected_clusters(), + + %% load intercepts. See ../intercepts/riak_repl_rt_intercepts.erl + load_intercepts(LeaderA), + + %% Enable RT replication from cluster "A" to cluster "B" + enable_bi_rt(LeaderA, ANodes, LeaderB, BNodes), + + %% Verify that heartbeats are being acknowledged by the sink (B) back to source (A) + %%?assertEqual(verify_heartbeat_messages(LeaderA), true), + + %% Verify RT repl of objects + write_n_keys(LeaderA, LeaderB, 1, 10000), + + write_n_keys(LeaderB, LeaderA, 10001, 20000), + + RTQStatus = rpc:call(LeaderA, riak_repl2_rtq, status, []), + + Consumers = proplists:get_value(consumers, RTQStatus), + case proplists:get_value("B", Consumers) of + undefined -> + []; + Consumer -> + Unacked = proplists:get_value(unacked, Consumer, 0), + lager:info("unacked: ~p", [Unacked]), + ?assertEqual(0, Unacked) + end, + %% Cause heartbeat messages to not be delivered, but remember the current + %% Pid of the RT connection. It should change after we stop heartbeats + %% because the RT connection will restart if all goes well. + %RTConnPid1 = get_rt_conn_pid(LeaderA), + %lager:info("Suspending HB"), + %suspend_heartbeat_messages(LeaderA), + + %% sleep longer than the HB timeout interval to force re-connection; + %% and give it time to restart the RT connection. Wait an extra 2 seconds. + %timer:sleep(?HB_TIMEOUT + 2000), + + %% Verify that RT connection has restarted by noting that it's Pid has changed + %RTConnPid2 = get_rt_conn_pid(LeaderA), + %?assertNotEqual(RTConnPid1, RTConnPid2), + + %% Verify that heart beats are not being ack'd + %rt:log_to_nodes([LeaderA], "Verify suspended HB"), + %?assertEqual(verify_heartbeat_messages(LeaderA), false), + + %% Resume heartbeat messages from source and allow some time to ack back. + %% Wait one second longer than the timeout + %rt:log_to_nodes([LeaderA], "Resuming HB"), + %resume_heartbeat_messages(LeaderA), + %timer:sleep(?HB_TIMEOUT + 1000), + + %% Verify that heartbeats are being acknowledged by the sink (B) back to source (A) + %rt:log_to_nodes([LeaderA], "Verify resumed HB"), + %?assertEqual(verify_heartbeat_messages(LeaderA), true), + + %% Verify RT repl of objects + %verify_rt(LeaderA, LeaderB), + + pass. + +%% @doc Turn on Realtime replication on the cluster lead by LeaderA. +%% The clusters must already have been named and connected. +enable_bi_rt(LeaderA, ANodes, LeaderB, BNodes) -> + repl_util:enable_realtime(LeaderA, "B"), + rt:wait_until_ring_converged(ANodes), + + repl_util:start_realtime(LeaderA, "B"), + rt:wait_until_ring_converged(ANodes), + + repl_util:enable_realtime(LeaderB, "A"), + rt:wait_until_ring_converged(BNodes), + + repl_util:start_realtime(LeaderB, "A"), + rt:wait_until_ring_converged(ANodes). + + +%% @doc Verify that RealTime replication is functioning correctly by +%% writing some objects to cluster A and checking they can be +%% read from cluster B. Each call creates a new bucket so that +%% verification can be tested multiple times independently. +write_n_keys(Source, Destination, M, N) -> + TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) || + <> <= erlang:md5(term_to_binary(os:timestamp()))]), + TestBucket = <>, + First = M, + Last = N, + + %% Write some objects to the source cluster (A), + lager:info("Writing ~p keys to ~p, which should RT repl to ~p", + [Last-First+1, Source, Destination]), + ?assertEqual([], repl_util:do_write(Source, First, Last, TestBucket, 2)), + + %% verify data is replicated to B + lager:info("Reading ~p keys written from ~p", [Last-First+1, Destination]), + ?assertEqual(0, repl_util:wait_for_reads(Destination, First, Last, TestBucket, 2)). + +%% @doc Connect two clusters for replication using their respective leader nodes. +connect_clusters(LeaderA, LeaderB) -> + {ok, {_IP, PortB}} = rpc:call(LeaderB, application, get_env, + [riak_core, cluster_mgr]), + lager:info("connect cluster A:~p to B on port ~p", [LeaderA, PortB]), + repl_util:connect_cluster(LeaderA, "127.0.0.1", PortB), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")), + + {ok, {_IP, PortA}} = rpc:call(LeaderA, application, get_env, + [riak_core, cluster_mgr]), + lager:info("connect cluster B:~p to A on port ~p", [LeaderB, PortA]), + repl_util:connect_cluster(LeaderB, "127.0.0.1", PortA), + ?assertEqual(ok, repl_util:wait_for_connection(LeaderB, "A")). + + + +%% @doc Create two clusters of 3 nodes each and connect them for replication: +%% Cluster "A" -> cluster "B" +make_connected_clusters() -> + NumNodes = rt_config:get(num_nodes, 6), + ClusterASize = rt_config:get(cluster_a_size, 3), + + lager:info("Deploy ~p nodes", [NumNodes]), + Conf = [ + {riak_repl, + [ + %% turn off fullsync + {fullsync_on_connect, false}, + {fullsync_interval, disabled}, + %% override defaults for RT heartbeat so that we + %% can see faults sooner and have a quicker test. + {rt_heartbeat_interval, ?HB_TIMEOUT}, + {rt_heartbeat_timeout, ?HB_TIMEOUT} + ]} + ], + + Nodes = rt:deploy_nodes(NumNodes, Conf), + + {ANodes, BNodes} = lists:split(ClusterASize, Nodes), + lager:info("ANodes: ~p", [ANodes]), + lager:info("BNodes: ~p", [BNodes]), + + lager:info("Build cluster A"), + repl_util:make_cluster(ANodes), + + lager:info("Build cluster B"), + repl_util:make_cluster(BNodes), + + %% get the leader for the first cluster + lager:info("waiting for leader to converge on cluster A"), + ?assertEqual(ok, repl_util:wait_until_leader_converge(ANodes)), + AFirst = hd(ANodes), + + %% get the leader for the second cluster + lager:info("waiting for leader to converge on cluster B"), + ?assertEqual(ok, repl_util:wait_until_leader_converge(BNodes)), + BFirst = hd(BNodes), + + %% Name the clusters + repl_util:name_cluster(AFirst, "A"), + rt:wait_until_ring_converged(ANodes), + + repl_util:name_cluster(BFirst, "B"), + rt:wait_until_ring_converged(BNodes), + + %% Connect for replication + connect_clusters(AFirst, BFirst), + + {AFirst, BFirst, ANodes, BNodes}. + +%% @doc Load intercepts file from ../intercepts/riak_repl2_rtsource_helper_intercepts.erl +load_intercepts(Node) -> + rt_intercept:load_code(Node). + +%% @doc Suspend heartbeats from the source node +%suspend_heartbeat_messages(Node) -> + %% disable forwarding of the heartbeat function call +% lager:info("Suspend sending of heartbeats from node ~p", [Node]), +% rt_intercept:add(Node, {riak_repl2_rtsource_helper, +% [{{send_heartbeat, 1}, drop_send_heartbeat}]}). + +%% @doc Resume heartbeats from the source node +%resume_heartbeat_messages(Node) -> + %% enable forwarding of the heartbeat function call +% lager:info("Resume sending of heartbeats from node ~p", [Node]), +% rt_intercept:add(Node, {riak_repl2_rtsource_helper, +% [{{send_heartbeat, 1}, forward_send_heartbeat}]}). + +%% @doc Get the Pid of the first RT source connection on Node +%get_rt_conn_pid(Node) -> +% [{_Remote, Pid}|Rest] = rpc:call(Node, riak_repl2_rtsource_conn_sup, enabled, []), +% case Rest of +% [] -> ok; +% RR -> lager:info("Other connections: ~p", [RR]) +% end, +% Pid. + +%% @doc Verify that heartbeat messages are being ack'd from the RT sink back to source Node +%verify_heartbeat_messages(Node) -> +% lager:info("Verify heartbeats"), +% Pid = get_rt_conn_pid(Node), +% Status = rpc:call(Node, riak_repl2_rtsource_conn, status, [Pid], ?RPC_TIMEOUT), +% HBRTT = proplists:get_value(hb_rtt, Status), +% case HBRTT of +% undefined -> +% false; +% RTT -> +% is_integer(RTT) +% end. diff --git a/tests/rt_cascading.erl b/tests/rt_cascading.erl index ee9878c1..63abe2b2 100644 --- a/tests/rt_cascading.erl +++ b/tests/rt_cascading.erl @@ -829,11 +829,168 @@ ensure_ack_test_() -> end} ] end}}. - + +ensure_unacked_and_queue() -> + eunit(ensure_unacked_and_queue_test_()). + +ensure_unacked_and_queue_test_() -> + {timeout, timeout(2300), {setup, fun() -> + Nodes = rt:deploy_nodes(6, conf()), + {N123, N456} = lists:split(3, Nodes), + repl_util:make_cluster(N123), + repl_util:make_cluster(N456), + repl_util:wait_until_leader_converge(N123), + repl_util:wait_until_leader_converge(N456), + repl_util:name_cluster(hd(N123), "n123"), + repl_util:name_cluster(hd(N456), "n456"), + N456Port = get_cluster_mgr_port(hd(N456)), + connect_rt(hd(N123), N456Port, "n456"), + N123Port = get_cluster_mgr_port(hd(N123)), + connect_rt(hd(N456), N123Port, "n123"), + {N123, N456} + end, + maybe_skip_teardown(fun({N123, N456}) -> + rt:clean_cluster(N123), + rt:clean_cluster(N456) + end), + fun({N123, N456}) -> [ + + {"unacked does not increase when there are skips", timeout, timeout(100), fun() -> + N123Leader = hd(N123), + N456Leader = hd(N456), + + write_n_keys(N123Leader, N456Leader, 1, 10000), + + write_n_keys(N456Leader, N123Leader, 10001, 20000), + + Res = rt:wait_until(fun() -> + RTQStatus = rpc:call(N123Leader, riak_repl2_rtq, status, []), + + Consumers = proplists:get_value(consumers, RTQStatus), + Data = proplists:get_value("n456", Consumers), + Unacked = proplists:get_value(unacked, Data), + ?debugFmt("unacked: ~p", [Unacked]), + 0 == Unacked + end), + ?assertEqual(ok, Res) + end}, + + {"after acks, queues are empty", fun() -> + Nodes = N123 ++ N456, + Got = lists:map(fun(Node) -> + rpc:call(Node, riak_repl2_rtq, all_queues_empty, []) + end, Nodes), + Expected = [true || _ <- lists:seq(1, length(Nodes))], + ?assertEqual(Expected, Got) + end}, + + {"after acks, queues truly are empty. Truly", fun() -> + Nodes = N123 ++ N456, + Gots = lists:map(fun(Node) -> + {Node, rpc:call(Node, riak_repl2_rtq, dumpq, [])} + end, Nodes), + lists:map(fun({Node, Got}) -> + ?debugFmt("Checking data from ~p", [Node]), + ?assertEqual([], Got) + end, Gots) + end}, + + {"dual loads keeps unacked satisfied", timeout, timeout(100), fun() -> + N123Leader = hd(N123), + N456Leader = hd(N456), + LoadN123Pid = spawn(fun() -> + {Time, Val} = timer:tc(fun write_n_keys/4, [N123Leader, N456Leader, 20001, 30000]), + ?debugFmt("loading 123 to 456 took ~p to get ~p", [Time, Val]), + Val + end), + LoadN456Pid = spawn(fun() -> + {Time, Val} = timer:tc(fun write_n_keys/4, [N456Leader, N123Leader, 30001, 40000]), + ?debugFmt("loading 456 to 123 took ~p to get ~p", [Time, Val]), + Val + end), + Exits = wait_exit([LoadN123Pid, LoadN456Pid], infinity), + ?assert(lists:all(fun(E) -> E == normal end, Exits)), + + StatusDig = fun(SinkName, Node) -> + Status = rpc:call(Node, riak_repl2_rtq, status, []), + Consumers = proplists:get_value(consumers, Status, []), + ConsumerStats = proplists:get_value(SinkName, Consumers, []), + proplists:get_value(unacked, ConsumerStats) + end, + + N123UnackedRes = rt:wait_until(fun() -> + Unacked = StatusDig("n456", N123Leader), + ?debugFmt("Unacked: ~p", [Unacked]), + 0 == Unacked + end), + ?assertEqual(ok, N123UnackedRes), + + N456Unacked = StatusDig("n123", N456Leader), + case N456Unacked of + 0 -> + ?assert(true); + _ -> + N456Unacked2 = StatusDig("n123", N456Leader), + ?debugFmt("Not 0, are they at least decreasing?~n" + " ~p, ~p", [N456Unacked2, N456Unacked]), + ?assert(N456Unacked2 < N456Unacked) + end + end}, + + {"after dual load acks, queues are empty", fun() -> + Nodes = N123 ++ N456, + Got = lists:map(fun(Node) -> + rpc:call(Node, riak_repl2_rtq, all_queues_empty, []) + end, Nodes), + Expected = [true || _ <- lists:seq(1, length(Nodes))], + ?assertEqual(Expected, Got) + end}, + + {"after dual load acks, queues truly are empty. Truly", fun() -> + Nodes = N123 ++ N456, + Gots = lists:map(fun(Node) -> + {Node, rpc:call(Node, riak_repl2_rtq, dumpq, [])} + end, Nodes), + lists:map(fun({Node, Got}) -> + ?debugFmt("Checking data from ~p", [Node]), + ?assertEqual([], Got) + end, Gots) + end}, + + {"no negative pendings", fun() -> + Nodes = N123 ++ N456, + GetPending = fun({sink_stats, SinkStats}) -> + ConnTo = proplists:get_value(rt_sink_connected_to, SinkStats), + proplists:get_value(pending, ConnTo) + end, + lists:map(fun(Node) -> + ?debugFmt("Checking node ~p", [Node]), + Status = rpc:call(Node, riak_repl_console, status, [quiet]), + Sinks = proplists:get_value(sinks, Status), + lists:map(fun(SStats) -> + Pending = GetPending(SStats), + ?assertEqual(0, Pending) + end, Sinks) + end, Nodes) + end} + + ] end}}. + %% ===== %% utility functions for teh happy %% ==== +wait_exit(Pids, Timeout) -> + Mons = [{erlang:monitor(process, Pid), Pid} || Pid <- Pids], + lists:map(fun({Mon, Pid}) -> + receive + {'DOWN', Mon, process, Pid, Cause} -> + Cause + after Timeout -> + timeout + end + end, Mons). + conf() -> [{lager, [ {handlers, [ @@ -854,7 +1011,8 @@ conf() -> {riak_repl, [ {fullsync_on_connect, false}, {fullsync_interval, disabled}, - {diff_batch_size, 10} + {diff_batch_size, 10}, + {rt_heartbeat_interval, undefined} ]}]. get_cluster_mgr_port(Node) -> @@ -929,6 +1087,22 @@ wait_for_rt_started(Node, ToName) -> end, rt:wait_until(Node, Fun). +write_n_keys(Source, Destination, M, N) -> + TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) || + <> <= erlang:md5(term_to_binary(os:timestamp()))]), + TestBucket = <>, + First = M, + Last = N, + + %% Write some objects to the source cluster (A), + lager:info("Writing ~p keys to ~p, which should RT repl to ~p", + [Last-First+1, Source, Destination]), + ?assertEqual([], repl_util:do_write(Source, First, Last, TestBucket, 2)), + + %% verify data is replicated to B + lager:info("Reading ~p keys written from ~p", [Last-First+1, Destination]), + ?assertEqual(0, repl_util:wait_for_reads(Destination, First, Last, TestBucket, 2)). + timeout(MultiplyBy) -> case rt_config:get(default_timeout, 1000) of infinity -> @@ -937,3 +1111,22 @@ timeout(MultiplyBy) -> N * MultiplyBy end. +eunit(TestDef) -> + case eunit:test(TestDef, [verbose]) of + ok -> + pass; + error -> + exit(error), + fail + end. + +maybe_skip_teardown(TearDownFun) -> + fun(Arg) -> + case rt_config:config_or_os_env(skip_teardowns, undefined) of + undefined -> + TearDownFun(Arg); + _ -> + ok + end + end. +