diff --git a/src/rt.erl b/src/rt.erl index 763cee3d..444fbdeb 100644 --- a/src/rt.erl +++ b/src/rt.erl @@ -40,7 +40,6 @@ capability/2, capability/3, check_singleton_node/1, - check_fullsync/3, check_ibrowse/0, claimant_according_to/1, clean_cluster/1, @@ -86,12 +85,8 @@ pmap/2, post_result/2, priv_dir/0, - read_from_cluster/5, remove/2, riak/2, - repl_get_leader/1, - repl_get_port/1, - repl_connect_cluster/3, rpc_get_env/2, set_backend/1, set_backend/2, @@ -120,7 +115,6 @@ update_app_config/2, upgrade/2, upgrade/3, - validate_completed_fullsync/6, versions/0, wait_for_cluster_service/2, wait_for_cmd/1, @@ -148,8 +142,7 @@ wait_until_transfers_complete/1, wait_until_unpingable/1, wait_until_bucket_type_status/3, - whats_up/0, - write_to_cluster/4 + whats_up/0 ]). -define(HARNESS, (rt_config:get(rt_harness))). @@ -1382,83 +1375,3 @@ wait_for_control(Vsn, Node) when is_atom(Node) -> %% @doc Wait for Riak Control to start on a series of nodes. wait_for_control(VersionedNodes) when is_list(VersionedNodes) -> [wait_for_control(Vsn, Node) || {Vsn, Node} <- VersionedNodes]. - -%% @doc Connect two clusters using a given name. -repl_connect_cluster(Source, Port, Name) -> - lager:info("Connecting ~p to ~p for cluster ~p.", - [Source, Port, Name]), - repl_util:connect_cluster(Source, "127.0.0.1", Port), - ?assertEqual(ok, repl_util:wait_for_connection(Source, Name)). - -%% @doc Given a node, find the port that the cluster manager is -%% listening on. -repl_get_port(Node) -> - {ok, {_IP, Port}} = rpc:call(Node, - application, - get_env, - [riak_core, cluster_mgr]), - Port. - -%% @doc Given a node, find out who the current replication leader in its -%% cluster is. -repl_get_leader(Node) -> - rpc:call(Node, riak_core_cluster_mgr, get_leader, []). - -%% @doc Validate fullsync completed and all keys are available. -validate_completed_fullsync(ReplicationLeader, - DestinationNode, - DestinationCluster, - Start, - End, - Bucket) -> - ok = check_fullsync(ReplicationLeader, DestinationCluster, 0), - lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)", - [End - Start, ReplicationLeader, - DestinationCluster, DestinationNode]), - ?assertEqual(0, - repl_util:wait_for_reads(DestinationNode, - Start, - End, - Bucket, - 1)). - -%% @doc Write a series of keys and ensure they are all written. -write_to_cluster(Node, Start, End, Bucket) -> - lager:info("Writing ~p keys to node ~p.", [End - Start, Node]), - ?assertEqual([], - repl_util:do_write(Node, Start, End, Bucket, 1)). - -%% @doc Read from cluster a series of keys, asserting a certain number -%% of errors. -read_from_cluster(Node, Start, End, Bucket, Errors) -> - lager:info("Reading ~p keys from node ~p.", [End - Start, Node]), - Res2 = rt:systest_read(Node, Start, End, Bucket, 1), - ?assertEqual(Errors, length(Res2)). - -%% @doc Assert we can perform one fullsync cycle, and that the number of -%% expected failures is correct. -check_fullsync(Node, Cluster, ExpectedFailures) -> - {Time, _} = timer:tc(repl_util, - start_and_wait_until_fullsync_complete, - [Node, Cluster]), - lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]), - - Status = rpc:call(Node, riak_repl_console, status, [quiet]), - - Props = case proplists:get_value(fullsync_coordinator, Status) of - [{_Name, Props0}] -> - Props0; - Multiple -> - {_Name, Props0} = lists:keyfind(Cluster, 1, Multiple), - Props0 - end, - - %% check that the expected number of partitions failed to sync - ?assertEqual(ExpectedFailures, - proplists:get_value(error_exits, Props)), - - %% check that we retried each of them 5 times - ?assert( - proplists:get_value(retry_exits, Props) >= ExpectedFailures * 5), - - ok. diff --git a/tests/repl_util.erl b/tests/repl_util.erl index 3501c035..9c0f4f16 100644 --- a/tests/repl_util.erl +++ b/tests/repl_util.erl @@ -29,7 +29,14 @@ num_partitions/1, get_cluster_mgr_port/1, maybe_reconnect_rt/3, - connect_rt/3 + connect_rt/3, + connect_cluster_by_name/3, + get_port/1, + get_leader/1, + write_to_cluster/4, + read_from_cluster/5, + check_fullsync/3, + validate_completed_fullsync/6 ]). -include_lib("eunit/include/eunit.hrl"). @@ -140,7 +147,8 @@ wait_until_no_connection(Node) -> wait_for_reads(Node, Start, End, Bucket, R) -> rt:wait_until(Node, fun(_) -> - rt:systest_read(Node, Start, End, Bucket, R) == [] + Reads = rt:systest_read(Node, Start, End, Bucket, R), + Reads == [] end), Reads = rt:systest_read(Node, Start, End, Bucket, R), lager:info("Reads: ~p", [Reads]), @@ -315,3 +323,85 @@ connect_rt(SourceNode, SinkPort, SinkName) -> repl_util:wait_for_connection(SourceNode, SinkName), repl_util:enable_realtime(SourceNode, SinkName), repl_util:start_realtime(SourceNode, SinkName). + +%% @doc Connect two clusters using a given name. +connect_cluster_by_name(Source, Port, Name) -> + lager:info("Connecting ~p to ~p for cluster ~p.", + [Source, Port, Name]), + repl_util:connect_cluster(Source, "127.0.0.1", Port), + ?assertEqual(ok, repl_util:wait_for_connection(Source, Name)). + +%% @doc Given a node, find the port that the cluster manager is +%% listening on. +get_port(Node) -> + {ok, {_IP, Port}} = rpc:call(Node, + application, + get_env, + [riak_core, cluster_mgr]), + Port. + +%% @doc Given a node, find out who the current replication leader in its +%% cluster is. +get_leader(Node) -> + rpc:call(Node, riak_core_cluster_mgr, get_leader, []). + +%% @doc Validate fullsync completed and all keys are available. +validate_completed_fullsync(ReplicationLeader, + DestinationNode, + DestinationCluster, + Start, + End, + Bucket) -> + ok = check_fullsync(ReplicationLeader, DestinationCluster, 0), + lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)", + [End - Start, ReplicationLeader, + DestinationCluster, DestinationNode]), + ?assertEqual(0, + repl_util:wait_for_reads(DestinationNode, + Start, + End, + Bucket, + 1)). + +%% @doc Write a series of keys and ensure they are all written. +write_to_cluster(Node, Start, End, Bucket) -> + lager:info("Writing ~p keys to node ~p.", [End - Start, Node]), + ?assertEqual([], + repl_util:do_write(Node, Start, End, Bucket, 1)). + +%% @doc Read from cluster a series of keys, asserting a certain number +%% of errors. +read_from_cluster(Node, Start, End, Bucket, Errors) -> + lager:info("Reading ~p keys from node ~p.", [End - Start, Node]), + Res2 = rt:systest_read(Node, Start, End, Bucket, 1), + ?assertEqual(Errors, length(Res2)). + +%% @doc Assert we can perform one fullsync cycle, and that the number of +%% expected failures is correct. +check_fullsync(Node, Cluster, ExpectedFailures) -> + {Time, _} = timer:tc(repl_util, + start_and_wait_until_fullsync_complete, + [Node, Cluster]), + lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]), + + Status = rpc:call(Node, riak_repl_console, status, [quiet]), + + Props = case proplists:get_value(fullsync_coordinator, Status) of + [{_Name, Props0}] -> + Props0; + Multiple -> + {_Name, Props0} = lists:keyfind(Cluster, 1, Multiple), + Props0 + end, + + %% check that the expected number of partitions failed to sync + ErrorExits = proplists:get_value(error_exits, Props), + lager:info("Error exits: ~p", [ErrorExits]), + ?assertEqual(ExpectedFailures, ErrorExits), + + %% check that we retried each of them 5 times + RetryExits = proplists:get_value(retry_exits, Props), + lager:info("Retry exits: ~p", [RetryExits]), + ?assert(RetryExits >= ExpectedFailures * 5), + + ok. diff --git a/tests/replication_object_reformat.erl b/tests/replication_object_reformat.erl index 37364755..18611fcf 100644 --- a/tests/replication_object_reformat.erl +++ b/tests/replication_object_reformat.erl @@ -74,14 +74,14 @@ verify_replication(AVersion, BVersion, Start, End) -> rt:wait_until_transfers_complete(BNodes), lager:info("Get leaders."), - LeaderA = rt:repl_get_leader(AFirst), - LeaderB = rt:repl_get_leader(BFirst), + LeaderA = repl_util:get_leader(AFirst), + LeaderB = repl_util:get_leader(BFirst), lager:info("Finding connection manager ports."), - BPort = rt:repl_get_port(LeaderB), + BPort = repl_util:get_port(LeaderB), lager:info("Connecting cluster A to B"), - rt:repl_connect_cluster(LeaderA, BPort, "B"), + repl_util:connect_cluster_by_name(LeaderA, BPort, "B"), lager:info("Enabling fullsync from A to B"), repl_util:enable_fullsync(LeaderA, "B"), @@ -97,22 +97,21 @@ verify_replication(AVersion, BVersion, Start, End) -> || N <- BNodes], lager:info("Ensuring connection from cluster A to B"), - rt:repl_connect_cluster(LeaderA, BPort, "B"), + repl_util:connect_cluster_by_name(LeaderA, BPort, "B"), lager:info("Write keys, assert they are not available yet."), - rt:write_to_cluster(AFirst, Start, End, ?TEST_BUCKET), + repl_util:write_to_cluster(AFirst, Start, End, ?TEST_BUCKET), lager:info("Verify we can not read the keys on the sink."), - rt:read_from_cluster(BFirst, Start, End, ?TEST_BUCKET, ?NUM_KEYS), + repl_util:read_from_cluster(BFirst, Start, End, ?TEST_BUCKET, ?NUM_KEYS), lager:info("Verify we can read the keys on the source."), - rt:read_from_cluster(AFirst, Start, End, ?TEST_BUCKET, 0), + repl_util:read_from_cluster(AFirst, Start, End, ?TEST_BUCKET, 0), lager:info("Performing sacrifice."), perform_sacrifice(AFirst, Start), - rt:validate_completed_fullsync(LeaderA, BFirst, "B", - Start, End, ?TEST_BUCKET), + repl_util:validate_completed_fullsync(LeaderA, BFirst, "B", Start, End, ?TEST_BUCKET), rt:clean_cluster(Nodes).