%% ------------------------------------------------------------------- %% %% Copyright (c) 2013 Basho Technologies, Inc. %% %% ------------------------------------------------------------------- -module(repl_rt_heartbeat). -behaviour(riak_test). -export([confirm/0]). -include_lib("eunit/include/eunit.hrl"). -define(RPC_TIMEOUT, 5000). -define(HB_TIMEOUT, 2). -define(HB_INTERVAL, 1). %% Replication Realtime Heartbeat test %% Valid for EE version 1.3.2 and up %% %% If both sides of an RT replication connection support it, a heartbeat %% message is sent from the RT Source to the RT Sink every %% {riak_repl, rt_heartbeat_interval} which default to 15s. If %% a response is not received in {riak_repl, rt_heartbeat_timeout}, also %% default to 15s then the source connection exits and will be re-established %% by the supervisor. %% %% RT Heartbeat messages are supported between EE releases 1.3.2 and up. %% %% Test: %% ----- %% Change the heartbeat_interval and heartbeat_timeout to 2 seconds, %% Start up two >1.3.2 clusters and connect them, %% Enable RT replication, %% Write some objects to the source cluster (A), %% Verify they got to the sink cluster (B), %% Verify that heartbeats are being acknowledged by the sink (B) back to source (A), %% Interupt the connection so that packets can not flow from A -> B, %% Verify that the connection is restarted after the heartbeat_timeout period, %% Verify that heartbeats are being acknowledged by the sink (B) back to source (A), %% Write some objects to the source cluster (A), %% Verify they got to the sink cluster (B), %% Have a cold beverage. %% @doc riak_test entry point confirm() -> %% Start up two >1.3.2 clusters and connect them, {LeaderA, LeaderB, ANodes, _BNodes} = make_connected_clusters(), %% load intercepts. See ../intercepts/riak_repl_rt_intercepts.erl load_intercepts(LeaderA), load_intercepts(LeaderB), %% Enable RT replication from cluster "A" to cluster "B" enable_rt(LeaderA, ANodes), %% Verify that heartbeats are being acknowledged by the sink (B) back to source (A) ?assertEqual(verify_heartbeat_messages(LeaderA), true), %% Verify RT repl of objects verify_rt(LeaderA, LeaderB), %% Cause heartbeat messages to not be delivered, but remember the current %% Pid of the RT connection. It should change after we stop heartbeats %% because the RT connection will restart if all goes well. RTConnPid1 = get_rt_conn_pid(LeaderA), lager:info("Suspending HB"), suspend_heartbeat_messages(LeaderA), %% sleep longer than the HB timeout interval to force re-connection; %% and give it time to restart the RT connection. %% Since it's possible we may disable heartbeats right after a heartbeat has been fired, %% it can take up to 2*?HB_TIMEOUT seconds to detect a missed heartbeat. The extra second %% is to avoid rare race conditions due to the timeouts lining up exactly. Not the prettiest %% solution, but it failed so rarely at 2*HB_TIMEOUT, that this should be good enough %% in practice, and it beats having to write a bunch of fancy intercepts to verify that %% the timeout has been hit internally. timer:sleep(timer:seconds(?HB_TIMEOUT*2) + 1000), %% Verify that RT connection has restarted by noting that it's Pid has changed RTConnPid2 = get_rt_conn_pid(LeaderA), ?assertNotEqual(RTConnPid1, RTConnPid2), %% Verify that heart beats are not being ack'd rt:log_to_nodes([LeaderA], "Verify suspended HB"), ?assertEqual(verify_heartbeat_messages(LeaderA), false), %% Resume heartbeat messages from source and allow some time to ack back. %% Wait one second longer than the timeout rt:log_to_nodes([LeaderA], "Resuming HB"), resume_heartbeat_messages(LeaderA), timer:sleep(timer:seconds(?HB_TIMEOUT) + 1000), %% Verify that heartbeats are being acknowledged by the sink (B) back to source (A) rt:log_to_nodes([LeaderA], "Verify resumed HB"), ?assertEqual(verify_heartbeat_messages(LeaderA), true), %% Verify RT repl of objects verify_rt(LeaderA, LeaderB), verify_hb_noresponse(LeaderA, LeaderB), pass. %% @doc Turn on Realtime replication on the cluster lead by LeaderA. %% The clusters must already have been named and connected. enable_rt(LeaderA, ANodes) -> repl_util:enable_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), repl_util:start_realtime(LeaderA, "B"), rt:wait_until_ring_converged(ANodes). %% @doc Verify that RealTime replication is functioning correctly by %% writing some objects to cluster A and checking they can be %% read from cluster B. Each call creates a new bucket so that %% verification can be tested multiple times independently. verify_rt(LeaderA, LeaderB) -> TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) || <> <= erlang:md5(term_to_binary(os:timestamp()))]), TestBucket = <>, First = 101, Last = 200, %% Write some objects to the source cluster (A), lager:info("Writing ~p keys to ~p, which should RT repl to ~p", [Last-First+1, LeaderA, LeaderB]), ?assertEqual([], repl_util:do_write(LeaderA, First, Last, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading ~p keys written from ~p", [Last-First+1, LeaderB]), ?assertEqual(0, repl_util:wait_for_reads(LeaderB, First, Last, TestBucket, 2)). verify_hb_noresponse(LeaderA, LeaderB) -> lager:info("Testing heartbeats with no responses, should not crash"), TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) || <> <= erlang:md5(term_to_binary(os:timestamp()))]), TestBucket = <>, First = 1, Last = 20000, %% suspend HB responses from the sink, write some data, then stop writing %% data, hb timeout should not crash the node suspend_heartbeat_responses(LeaderB), %% Write some objects to the source cluster (A), lager:info("Writing ~p keys to ~p, which should RT repl to ~p", [Last-First+1, LeaderA, LeaderB]), ?assertEqual([], repl_util:do_write(LeaderA, First, Last, TestBucket, 2)), %% verify data is replicated to B lager:info("Reading ~p keys written from ~p", [Last-First+1, LeaderB]), timer:sleep(?HB_TIMEOUT + 1000), ?assertEqual(0, repl_util:wait_for_reads(LeaderB, First, Last, TestBucket, 2)). %% @doc Connect two clusters for replication using their respective leader nodes. connect_clusters(LeaderA, LeaderB) -> {ok, {_IP, Port}} = rpc:call(LeaderB, application, get_env, [riak_core, cluster_mgr]), lager:info("connect cluster A:~p to B on port ~p", [LeaderA, Port]), repl_util:connect_cluster(LeaderA, "127.0.0.1", Port), ?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")). %% @doc Create two clusters of 3 nodes each and connect them for replication: %% Cluster "A" -> cluster "B" make_connected_clusters() -> NumNodes = rt_config:get(num_nodes, 6), ClusterASize = rt_config:get(cluster_a_size, 3), lager:info("Deploy ~p nodes", [NumNodes]), Conf = [ {riak_repl, [ %% turn off fullsync {fullsync_on_connect, false}, {fullsync_interval, disabled}, %% override defaults for RT heartbeat so that we %% can see faults sooner and have a quicker test. {rt_heartbeat_interval, ?HB_TIMEOUT}, {rt_heartbeat_timeout, ?HB_TIMEOUT} ]} ], Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]), {ANodes, BNodes} = lists:split(ClusterASize, Nodes), lager:info("ANodes: ~p", [ANodes]), lager:info("BNodes: ~p", [BNodes]), lager:info("Build cluster A"), repl_util:make_cluster(ANodes), lager:info("Build cluster B"), repl_util:make_cluster(BNodes), %% get the leader for the first cluster lager:info("waiting for leader to converge on cluster A"), ?assertEqual(ok, repl_util:wait_until_leader_converge(ANodes)), AFirst = hd(ANodes), %% get the leader for the second cluster lager:info("waiting for leader to converge on cluster B"), ?assertEqual(ok, repl_util:wait_until_leader_converge(BNodes)), BFirst = hd(BNodes), %% Name the clusters repl_util:name_cluster(AFirst, "A"), rt:wait_until_ring_converged(ANodes), repl_util:name_cluster(BFirst, "B"), rt:wait_until_ring_converged(BNodes), %% Connect for replication connect_clusters(AFirst, BFirst), {AFirst, BFirst, ANodes, BNodes}. %% @doc Load intercepts file from ../intercepts/riak_repl2_rtsource_helper_intercepts.erl load_intercepts(Node) -> rt_intercept:load_code(Node). %% @doc Suspend heartbeats from the source node suspend_heartbeat_messages(Node) -> %% disable forwarding of the heartbeat function call lager:info("Suspend sending of heartbeats from node ~p", [Node]), rt_intercept:add(Node, {riak_repl2_rtsource_helper, [{{send_heartbeat, 1}, drop_send_heartbeat}]}). %% @doc Resume heartbeats from the sink node resume_heartbeat_messages(Node) -> %% enable forwarding of the heartbeat function call lager:info("Resume sending of heartbeats from node ~p", [Node]), rt_intercept:add(Node, {riak_repl2_rtsource_helper, [{{send_heartbeat, 1}, forward_send_heartbeat}]}). suspend_heartbeat_responses(Node) -> lager:info("Suspending sending of heartbeat responses from node ~p", [Node]), rt_intercept:add(Node, {riak_repl2_rtsink_conn, [{{send_heartbeat, 2}, drop_send_heartbeat_resp}]}). %% @doc Get the Pid of the first RT source connection on Node get_rt_conn_pid(Node) -> [{_Remote, Pid}|Rest] = rpc:call(Node, riak_repl2_rtsource_conn_sup, enabled, []), case Rest of [] -> ok; RR -> lager:info("Other connections: ~p", [RR]) end, Pid. %% @doc Verify that heartbeat messages are being ack'd from the RT sink back to source Node verify_heartbeat_messages(Node) -> lager:info("Verify heartbeats"), Pid = get_rt_conn_pid(Node), Status = rpc:call(Node, riak_repl2_rtsource_conn, status, [Pid], ?RPC_TIMEOUT), HBRTT = proplists:get_value(hb_rtt, Status), case HBRTT of undefined -> false; RTT -> is_integer(RTT) end.