2013-12-04 19:35:46 +00:00
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%%
|
|
|
|
%% Copyright (c) 2013 Basho Technologies, Inc.
|
|
|
|
%%
|
|
|
|
%% This file is provided to you under the Apache License,
|
|
|
|
%% Version 2.0 (the "License"); you may not use this file
|
|
|
|
%% except in compliance with the License. You may obtain
|
|
|
|
%% a copy of the License at
|
|
|
|
%%
|
|
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
%%
|
|
|
|
%% Unless required by applicable law or agreed to in writing,
|
|
|
|
%% software distributed under the License is distributed on an
|
|
|
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
%% KIND, either express or implied. See the License for the
|
|
|
|
%% specific language governing permissions and limitations
|
|
|
|
%% under the License.
|
|
|
|
%%
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%%
|
|
|
|
|
|
|
|
-module(replication2_rt_sink_connection).
|
|
|
|
-behaviour(riak_test).
|
|
|
|
-export([confirm/0]).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
|
|
-define(HB_TIMEOUT, 2000).
|
|
|
|
|
|
|
|
confirm() ->
|
|
|
|
NumNodes = rt_config:get(num_nodes, 6),
|
|
|
|
|
|
|
|
lager:info("Deploy ~p nodes", [NumNodes]),
|
|
|
|
Conf = [
|
|
|
|
{riak_repl,
|
|
|
|
[
|
|
|
|
%% turn off fullsync
|
|
|
|
{fullsync_on_connect, false},
|
|
|
|
{fullsync_interval, disabled},
|
|
|
|
%% override defaults for RT heartbeat so that we
|
|
|
|
%% can see faults sooner and have a quicker test.
|
|
|
|
{rt_heartbeat_interval, ?HB_TIMEOUT},
|
|
|
|
{rt_heartbeat_timeout, ?HB_TIMEOUT}
|
|
|
|
]}
|
|
|
|
],
|
|
|
|
|
2014-12-18 21:07:00 +00:00
|
|
|
Nodes = rt:deploy_nodes(NumNodes, Conf, [riak_kv, riak_repl]),
|
2013-12-04 19:35:46 +00:00
|
|
|
{ANodes, Rest} = lists:split(2, Nodes),
|
|
|
|
{BNodes, CNodes} = lists:split(2, Rest),
|
|
|
|
|
|
|
|
lager:info("Loading intercepts."),
|
|
|
|
CNode = hd(CNodes),
|
|
|
|
rt_intercept:load_code(CNode),
|
|
|
|
rt_intercept:add(CNode, {riak_repl_ring_handler,
|
|
|
|
[{{handle_event, 2}, slow_handle_event}]}),
|
|
|
|
|
|
|
|
lager:info("ANodes: ~p", [ANodes]),
|
|
|
|
lager:info("BNodes: ~p", [BNodes]),
|
|
|
|
lager:info("CNodes: ~p", [CNodes]),
|
|
|
|
|
|
|
|
lager:info("Build cluster A"),
|
|
|
|
repl_util:make_cluster(ANodes),
|
|
|
|
|
|
|
|
lager:info("Build cluster B"),
|
|
|
|
repl_util:make_cluster(BNodes),
|
|
|
|
|
|
|
|
% lager:info("Waiting for cluster A to converge"),
|
|
|
|
% rt:wait_until_ring_converged(ANodes),
|
|
|
|
|
|
|
|
% lager:info("Waiting for cluster B to converge"),
|
|
|
|
% rt:wait_until_ring_converged(BNodes),
|
|
|
|
|
|
|
|
lager:info("waiting for leader to converge on cluster A"),
|
|
|
|
?assertEqual(ok, repl_util:wait_until_leader_converge(ANodes)),
|
|
|
|
AFirst = hd(ANodes),
|
|
|
|
|
|
|
|
lager:info("waiting for leader to converge on cluster B"),
|
|
|
|
?assertEqual(ok, repl_util:wait_until_leader_converge(BNodes)),
|
|
|
|
BFirst = hd(BNodes),
|
|
|
|
|
|
|
|
lager:info("Naming A"),
|
|
|
|
repl_util:name_cluster(AFirst, "A"),
|
|
|
|
|
|
|
|
lager:info("Naming B"),
|
|
|
|
repl_util:name_cluster(BFirst, "B"),
|
|
|
|
|
|
|
|
connect_clusters(AFirst, BFirst),
|
|
|
|
|
|
|
|
enable_rt(AFirst, ANodes),
|
|
|
|
|
|
|
|
lager:info("Adding 4th node to the A cluster"),
|
|
|
|
rt:join(CNode, AFirst),
|
|
|
|
|
|
|
|
[verify_connectivity(Node) || Node <- ANodes],
|
|
|
|
|
|
|
|
verify_connectivity(CNode),
|
|
|
|
|
|
|
|
pass.
|
|
|
|
|
|
|
|
%% @doc Verify connectivity between sources and sink.
|
|
|
|
verify_connectivity(Node) ->
|
|
|
|
rt:wait_until(Node, fun(N) ->
|
|
|
|
{ok, Connections} = rpc:call(N,
|
|
|
|
riak_core_cluster_mgr,
|
|
|
|
get_connections,
|
|
|
|
[]),
|
|
|
|
lager:info("Waiting for sink connections on ~p: ~p.",
|
|
|
|
[Node, Connections]),
|
|
|
|
Connections =/= []
|
|
|
|
end).
|
|
|
|
|
|
|
|
%% @doc Connect two clusters for replication using their respective leader nodes.
|
|
|
|
connect_clusters(LeaderA, LeaderB) ->
|
|
|
|
{ok, {_IP, Port}} = rpc:call(LeaderB, application, get_env,
|
|
|
|
[riak_core, cluster_mgr]),
|
|
|
|
lager:info("Connect cluster A:~p to B on port ~p", [LeaderA, Port]),
|
|
|
|
repl_util:connect_cluster(LeaderA, "127.0.0.1", Port).
|
|
|
|
|
|
|
|
%% @doc Turn on Realtime replication on the cluster lead by LeaderA.
|
|
|
|
%% The clusters must already have been named and connected.
|
|
|
|
enable_rt(LeaderA, ANodes) ->
|
|
|
|
lager:info("Enabling RT replication: ~p ~p.", [LeaderA, ANodes]),
|
|
|
|
repl_util:enable_realtime(LeaderA, "B"),
|
|
|
|
repl_util:start_realtime(LeaderA, "B").
|