make repl_aae_fullsync work in rtcloud

This commit is contained in:
Mikael Lixenstrand 2016-02-03 14:39:32 +01:00 committed by Nick Marino
parent 7e2b3da389
commit f765417fe2
2 changed files with 42 additions and 39 deletions

View File

@ -6,6 +6,7 @@
-module(repl_aae_fullsync). -module(repl_aae_fullsync).
-behavior(riak_test). -behavior(riak_test).
-compile([export_all]).
-export([confirm/0]). -export([confirm/0]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -28,7 +29,7 @@
[ [
%% Specify fast building of AAE trees %% Specify fast building of AAE trees
{sweep_tick, 10000}, {sweep_tick, 10000},
{anti_entropy, {on, []}}, {anti_entropy, {on, [debug]}},
{anti_entropy_build_limit, {100, 1000}}, {anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 100} {anti_entropy_concurrency, 100}
] ]
@ -86,10 +87,10 @@ simple_test() ->
LeaderB = get_leader(BFirst), LeaderB = get_leader(BFirst),
lager:info("Finding connection manager ports."), lager:info("Finding connection manager ports."),
BPort = get_port(LeaderB), Endpint = repl_util:get_endpoint(LeaderB),
lager:info("Connecting cluster A to B"), lager:info("Connecting cluster A to B"),
connect_cluster(LeaderA, BPort, "B"), connect_cluster(LeaderA, Endpint, "B"),
%% Write keys prior to fullsync. %% Write keys prior to fullsync.
write_to_cluster(AFirst, 1, ?NUM_KEYS), write_to_cluster(AFirst, 1, ?NUM_KEYS),
@ -167,17 +168,17 @@ dual_test() ->
LeaderC = get_leader(CFirst), LeaderC = get_leader(CFirst),
lager:info("Finding connection manager ports."), lager:info("Finding connection manager ports."),
APort = get_port(LeaderA), AEndpint = repl_util:get_endpoint(LeaderA),
BPort = get_port(LeaderB), BEndpint = repl_util:get_endpoint(LeaderB),
CPort = get_port(LeaderC), CEndpint = repl_util:get_endpoint(LeaderC),
lager:info("Connecting all clusters into fully connected topology."), lager:info("Connecting all clusters into fully connected topology."),
connect_cluster(LeaderA, BPort, "B"), connect_cluster(LeaderA, BEndpint, "B"),
connect_cluster(LeaderA, CPort, "C"), connect_cluster(LeaderA, CEndpint, "C"),
connect_cluster(LeaderB, APort, "A"), connect_cluster(LeaderB, AEndpint, "A"),
connect_cluster(LeaderB, CPort, "C"), connect_cluster(LeaderB, CEndpint, "C"),
connect_cluster(LeaderC, APort, "A"), connect_cluster(LeaderC, AEndpint, "A"),
connect_cluster(LeaderC, BPort, "B"), connect_cluster(LeaderC, BEndpint, "B"),
%% Write keys to cluster A, verify B and C do not have them. %% Write keys to cluster A, verify B and C do not have them.
write_to_cluster(AFirst, 1, ?NUM_KEYS), write_to_cluster(AFirst, 1, ?NUM_KEYS),
@ -262,14 +263,14 @@ bidirectional_test() ->
LeaderB = get_leader(BFirst), LeaderB = get_leader(BFirst),
lager:info("Finding connection manager ports."), lager:info("Finding connection manager ports."),
APort = get_port(LeaderA), AEndpint = repl_util:get_endpoint(LeaderA),
BPort = get_port(LeaderB), BEndpint = repl_util:get_endpoint(LeaderB),
lager:info("Connecting cluster A to B"), lager:info("Connecting cluster A to B"),
connect_cluster(LeaderA, BPort, "B"), connect_cluster(LeaderA, BEndpint, "B"),
lager:info("Connecting cluster B to A"), lager:info("Connecting cluster B to A"),
connect_cluster(LeaderB, APort, "A"), connect_cluster(LeaderB, AEndpint, "A"),
%% Write keys to cluster A, verify B does not have them. %% Write keys to cluster A, verify B does not have them.
write_to_cluster(AFirst, 1, ?NUM_KEYS), write_to_cluster(AFirst, 1, ?NUM_KEYS),
@ -347,10 +348,10 @@ difference_test() ->
LeaderB = get_leader(BFirst), LeaderB = get_leader(BFirst),
lager:info("Finding connection manager ports."), lager:info("Finding connection manager ports."),
BPort = get_port(LeaderB), BEndpint = repl_util:get_endpoint(LeaderB),
lager:info("Connecting cluster A to B"), lager:info("Connecting cluster A to B"),
connect_cluster(LeaderA, BPort, "B"), connect_cluster(LeaderA, BEndpint, "B"),
%% Get PBC connections. %% Get PBC connections.
APBC = rt:pbc(LeaderA), APBC = rt:pbc(LeaderA),
@ -454,10 +455,10 @@ deadlock_test() ->
LeaderB = get_leader(BFirst), LeaderB = get_leader(BFirst),
lager:info("Finding connection manager ports."), lager:info("Finding connection manager ports."),
BPort = get_port(LeaderB), Endpint = repl_util:get_endpoint(LeaderB),
lager:info("Connecting cluster A to B"), lager:info("Connecting cluster A to B"),
connect_cluster(LeaderA, BPort, "B"), connect_cluster(LeaderA, Endpint, "B"),
%% Add intercept for delayed comparison of hashtrees. %% Add intercept for delayed comparison of hashtrees.
Intercept = {riak_kv_index_hashtree, [{{compare, 4}, delayed_compare}]}, Intercept = {riak_kv_index_hashtree, [{{compare, 4}, delayed_compare}]},
@ -614,25 +615,16 @@ validate_intercepted_fullsync(InterceptTarget,
%% Wait until AAE trees are compueted on the rebooted node. %% Wait until AAE trees are compueted on the rebooted node.
rt:wait_until_aae_trees_built([InterceptTarget]). rt:wait_until_aae_trees_built([InterceptTarget]).
%% @doc Given a node, find the port that the cluster manager is
%% listening on.
get_port(Node) ->
{ok, {_IP, Port}} = rpc:call(Node,
application,
get_env,
[riak_core, cluster_mgr]),
Port.
%% @doc Given a node, find out who the current replication leader in its %% @doc Given a node, find out who the current replication leader in its
%% cluster is. %% cluster is.
get_leader(Node) -> get_leader(Node) ->
rpc:call(Node, riak_core_cluster_mgr, get_leader, []). rpc:call(Node, riak_core_cluster_mgr, get_leader, []).
%% @doc Connect two clusters using a given name. %% @doc Connect two clusters using a given name.
connect_cluster(Source, Port, Name) -> connect_cluster(Source, {Ip, Port}, Name) ->
lager:info("Connecting ~p to ~p for cluster ~p.", lager:info("Connecting ~p to ~p:~p for cluster ~p.",
[Source, Port, Name]), [Source, Ip, Port, Name]),
repl_util:connect_cluster(Source, "127.0.0.1", Port), repl_util:connect_cluster(Source, Ip, Port),
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)). ?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
%% @doc Write a series of keys and ensure they are all written. %% @doc Write a series of keys and ensure they are all written.

View File

@ -20,6 +20,7 @@
start_and_wait_until_fullsync_complete/2, start_and_wait_until_fullsync_complete/2,
start_and_wait_until_fullsync_complete/3, start_and_wait_until_fullsync_complete/3,
start_and_wait_until_fullsync_complete/4, start_and_wait_until_fullsync_complete/4,
connect_cluster/2,
connect_cluster/3, connect_cluster/3,
disconnect_cluster/2, disconnect_cluster/2,
wait_for_connection/2, wait_for_connection/2,
@ -43,6 +44,7 @@
connect_cluster_by_name/3, connect_cluster_by_name/3,
connect_cluster_by_name/4, connect_cluster_by_name/4,
get_port/1, get_port/1,
get_endpoint/1,
get_leader/1, get_leader/1,
write_to_cluster/4, write_to_cluster/4,
write_to_cluster/5, write_to_cluster/5,
@ -304,6 +306,9 @@ make_fullsync_wait_fun2([Node|Tail], Count) when is_atom(Node) ->
end end
end. end.
connect_cluster(Node, {IP, Port}) ->
connect_cluster(Node, IP, Port).
connect_cluster(Node, IP, Port) -> connect_cluster(Node, IP, Port) ->
Res = rpc:call(Node, riak_repl_console, connect, Res = rpc:call(Node, riak_repl_console, connect,
[[IP, integer_to_list(Port)]]), [[IP, integer_to_list(Port)]]),
@ -540,19 +545,25 @@ connect_cluster_by_name(Source, Port, Name) ->
%% @doc Connect two clusters using a given name. %% @doc Connect two clusters using a given name.
connect_cluster_by_name(Source, Destination, Port, Name) -> connect_cluster_by_name(Source, Destination, Port, Name) ->
lager:info("Connecting ~p to ~p for cluster ~p.", lager:info("Connecting ~p to ~p:~p for cluster ~p.",
[Source, Port, Name]), [Source, Destination, Port, Name]),
repl_util:connect_cluster(Source, Destination, Port), repl_util:connect_cluster(Source, Destination, Port),
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)). ?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
%% @doc Given a node, find the port that the cluster manager is %% @doc Given a node, find the port that the cluster manager is
%% listening on. %% listening on.
get_port(Node) -> get_port(Node) ->
{ok, {_IP, Port}} = rpc:call(Node, {_IP, Port} = get_endpoint(Node),
Port.
%% @doc Given a node, find the port that the cluster manager is
%% listening on.
get_endpoint(Node) ->
{ok, {IP, Port}} = rpc:call(Node,
application, application,
get_env, get_env,
[riak_core, cluster_mgr]), [riak_core, cluster_mgr]),
Port. {IP, Port}.
%% @doc Given a node, find out who the current replication leader in its %% @doc Given a node, find out who the current replication leader in its
%% cluster is. %% cluster is.