General code cleanup.

This commit is contained in:
Christopher Meiklejohn 2014-01-16 16:29:24 -05:00
parent 766b795944
commit 494cd2deb5
3 changed files with 102 additions and 100 deletions

View File

@ -40,7 +40,6 @@
capability/2, capability/2,
capability/3, capability/3,
check_singleton_node/1, check_singleton_node/1,
check_fullsync/3,
check_ibrowse/0, check_ibrowse/0,
claimant_according_to/1, claimant_according_to/1,
clean_cluster/1, clean_cluster/1,
@ -86,12 +85,8 @@
pmap/2, pmap/2,
post_result/2, post_result/2,
priv_dir/0, priv_dir/0,
read_from_cluster/5,
remove/2, remove/2,
riak/2, riak/2,
repl_get_leader/1,
repl_get_port/1,
repl_connect_cluster/3,
rpc_get_env/2, rpc_get_env/2,
set_backend/1, set_backend/1,
set_backend/2, set_backend/2,
@ -120,7 +115,6 @@
update_app_config/2, update_app_config/2,
upgrade/2, upgrade/2,
upgrade/3, upgrade/3,
validate_completed_fullsync/6,
versions/0, versions/0,
wait_for_cluster_service/2, wait_for_cluster_service/2,
wait_for_cmd/1, wait_for_cmd/1,
@ -148,8 +142,7 @@
wait_until_transfers_complete/1, wait_until_transfers_complete/1,
wait_until_unpingable/1, wait_until_unpingable/1,
wait_until_bucket_type_status/3, wait_until_bucket_type_status/3,
whats_up/0, whats_up/0
write_to_cluster/4
]). ]).
-define(HARNESS, (rt_config:get(rt_harness))). -define(HARNESS, (rt_config:get(rt_harness))).
@ -1382,83 +1375,3 @@ wait_for_control(Vsn, Node) when is_atom(Node) ->
%% @doc Wait for Riak Control to start on a series of nodes. %% @doc Wait for Riak Control to start on a series of nodes.
wait_for_control(VersionedNodes) when is_list(VersionedNodes) -> wait_for_control(VersionedNodes) when is_list(VersionedNodes) ->
[wait_for_control(Vsn, Node) || {Vsn, Node} <- VersionedNodes]. [wait_for_control(Vsn, Node) || {Vsn, Node} <- VersionedNodes].
%% @doc Connect two clusters using a given name.
repl_connect_cluster(Source, Port, Name) ->
lager:info("Connecting ~p to ~p for cluster ~p.",
[Source, Port, Name]),
repl_util:connect_cluster(Source, "127.0.0.1", Port),
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
%% @doc Given a node, find the port that the cluster manager is
%% listening on.
repl_get_port(Node) ->
{ok, {_IP, Port}} = rpc:call(Node,
application,
get_env,
[riak_core, cluster_mgr]),
Port.
%% @doc Given a node, find out who the current replication leader in its
%% cluster is.
repl_get_leader(Node) ->
rpc:call(Node, riak_core_cluster_mgr, get_leader, []).
%% @doc Validate fullsync completed and all keys are available.
validate_completed_fullsync(ReplicationLeader,
DestinationNode,
DestinationCluster,
Start,
End,
Bucket) ->
ok = check_fullsync(ReplicationLeader, DestinationCluster, 0),
lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)",
[End - Start, ReplicationLeader,
DestinationCluster, DestinationNode]),
?assertEqual(0,
repl_util:wait_for_reads(DestinationNode,
Start,
End,
Bucket,
1)).
%% @doc Write a series of keys and ensure they are all written.
write_to_cluster(Node, Start, End, Bucket) ->
lager:info("Writing ~p keys to node ~p.", [End - Start, Node]),
?assertEqual([],
repl_util:do_write(Node, Start, End, Bucket, 1)).
%% @doc Read from cluster a series of keys, asserting a certain number
%% of errors.
read_from_cluster(Node, Start, End, Bucket, Errors) ->
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
Res2 = rt:systest_read(Node, Start, End, Bucket, 1),
?assertEqual(Errors, length(Res2)).
%% @doc Assert we can perform one fullsync cycle, and that the number of
%% expected failures is correct.
check_fullsync(Node, Cluster, ExpectedFailures) ->
{Time, _} = timer:tc(repl_util,
start_and_wait_until_fullsync_complete,
[Node, Cluster]),
lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]),
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
Props = case proplists:get_value(fullsync_coordinator, Status) of
[{_Name, Props0}] ->
Props0;
Multiple ->
{_Name, Props0} = lists:keyfind(Cluster, 1, Multiple),
Props0
end,
%% check that the expected number of partitions failed to sync
?assertEqual(ExpectedFailures,
proplists:get_value(error_exits, Props)),
%% check that we retried each of them 5 times
?assert(
proplists:get_value(retry_exits, Props) >= ExpectedFailures * 5),
ok.

View File

@ -29,7 +29,14 @@
num_partitions/1, num_partitions/1,
get_cluster_mgr_port/1, get_cluster_mgr_port/1,
maybe_reconnect_rt/3, maybe_reconnect_rt/3,
connect_rt/3 connect_rt/3,
connect_cluster_by_name/3,
get_port/1,
get_leader/1,
write_to_cluster/4,
read_from_cluster/5,
check_fullsync/3,
validate_completed_fullsync/6
]). ]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -140,7 +147,8 @@ wait_until_no_connection(Node) ->
wait_for_reads(Node, Start, End, Bucket, R) -> wait_for_reads(Node, Start, End, Bucket, R) ->
rt:wait_until(Node, rt:wait_until(Node,
fun(_) -> fun(_) ->
rt:systest_read(Node, Start, End, Bucket, R) == [] Reads = rt:systest_read(Node, Start, End, Bucket, R),
Reads == []
end), end),
Reads = rt:systest_read(Node, Start, End, Bucket, R), Reads = rt:systest_read(Node, Start, End, Bucket, R),
lager:info("Reads: ~p", [Reads]), lager:info("Reads: ~p", [Reads]),
@ -315,3 +323,85 @@ connect_rt(SourceNode, SinkPort, SinkName) ->
repl_util:wait_for_connection(SourceNode, SinkName), repl_util:wait_for_connection(SourceNode, SinkName),
repl_util:enable_realtime(SourceNode, SinkName), repl_util:enable_realtime(SourceNode, SinkName),
repl_util:start_realtime(SourceNode, SinkName). repl_util:start_realtime(SourceNode, SinkName).
%% @doc Connect two clusters using a given name.
connect_cluster_by_name(Source, Port, Name) ->
lager:info("Connecting ~p to ~p for cluster ~p.",
[Source, Port, Name]),
repl_util:connect_cluster(Source, "127.0.0.1", Port),
?assertEqual(ok, repl_util:wait_for_connection(Source, Name)).
%% @doc Given a node, find the port that the cluster manager is
%% listening on.
get_port(Node) ->
{ok, {_IP, Port}} = rpc:call(Node,
application,
get_env,
[riak_core, cluster_mgr]),
Port.
%% @doc Given a node, find out who the current replication leader in its
%% cluster is.
get_leader(Node) ->
rpc:call(Node, riak_core_cluster_mgr, get_leader, []).
%% @doc Validate fullsync completed and all keys are available.
validate_completed_fullsync(ReplicationLeader,
DestinationNode,
DestinationCluster,
Start,
End,
Bucket) ->
ok = check_fullsync(ReplicationLeader, DestinationCluster, 0),
lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)",
[End - Start, ReplicationLeader,
DestinationCluster, DestinationNode]),
?assertEqual(0,
repl_util:wait_for_reads(DestinationNode,
Start,
End,
Bucket,
1)).
%% @doc Write a series of keys and ensure they are all written.
write_to_cluster(Node, Start, End, Bucket) ->
lager:info("Writing ~p keys to node ~p.", [End - Start, Node]),
?assertEqual([],
repl_util:do_write(Node, Start, End, Bucket, 1)).
%% @doc Read from cluster a series of keys, asserting a certain number
%% of errors.
read_from_cluster(Node, Start, End, Bucket, Errors) ->
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
Res2 = rt:systest_read(Node, Start, End, Bucket, 1),
?assertEqual(Errors, length(Res2)).
%% @doc Assert we can perform one fullsync cycle, and that the number of
%% expected failures is correct.
check_fullsync(Node, Cluster, ExpectedFailures) ->
{Time, _} = timer:tc(repl_util,
start_and_wait_until_fullsync_complete,
[Node, Cluster]),
lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]),
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
Props = case proplists:get_value(fullsync_coordinator, Status) of
[{_Name, Props0}] ->
Props0;
Multiple ->
{_Name, Props0} = lists:keyfind(Cluster, 1, Multiple),
Props0
end,
%% check that the expected number of partitions failed to sync
ErrorExits = proplists:get_value(error_exits, Props),
lager:info("Error exits: ~p", [ErrorExits]),
?assertEqual(ExpectedFailures, ErrorExits),
%% check that we retried each of them 5 times
RetryExits = proplists:get_value(retry_exits, Props),
lager:info("Retry exits: ~p", [RetryExits]),
?assert(RetryExits >= ExpectedFailures * 5),
ok.

View File

@ -74,14 +74,14 @@ verify_replication(AVersion, BVersion, Start, End) ->
rt:wait_until_transfers_complete(BNodes), rt:wait_until_transfers_complete(BNodes),
lager:info("Get leaders."), lager:info("Get leaders."),
LeaderA = rt:repl_get_leader(AFirst), LeaderA = repl_util:get_leader(AFirst),
LeaderB = rt:repl_get_leader(BFirst), LeaderB = repl_util:get_leader(BFirst),
lager:info("Finding connection manager ports."), lager:info("Finding connection manager ports."),
BPort = rt:repl_get_port(LeaderB), BPort = repl_util:get_port(LeaderB),
lager:info("Connecting cluster A to B"), lager:info("Connecting cluster A to B"),
rt:repl_connect_cluster(LeaderA, BPort, "B"), repl_util:connect_cluster_by_name(LeaderA, BPort, "B"),
lager:info("Enabling fullsync from A to B"), lager:info("Enabling fullsync from A to B"),
repl_util:enable_fullsync(LeaderA, "B"), repl_util:enable_fullsync(LeaderA, "B"),
@ -97,22 +97,21 @@ verify_replication(AVersion, BVersion, Start, End) ->
|| N <- BNodes], || N <- BNodes],
lager:info("Ensuring connection from cluster A to B"), lager:info("Ensuring connection from cluster A to B"),
rt:repl_connect_cluster(LeaderA, BPort, "B"), repl_util:connect_cluster_by_name(LeaderA, BPort, "B"),
lager:info("Write keys, assert they are not available yet."), lager:info("Write keys, assert they are not available yet."),
rt:write_to_cluster(AFirst, Start, End, ?TEST_BUCKET), repl_util:write_to_cluster(AFirst, Start, End, ?TEST_BUCKET),
lager:info("Verify we can not read the keys on the sink."), lager:info("Verify we can not read the keys on the sink."),
rt:read_from_cluster(BFirst, Start, End, ?TEST_BUCKET, ?NUM_KEYS), repl_util:read_from_cluster(BFirst, Start, End, ?TEST_BUCKET, ?NUM_KEYS),
lager:info("Verify we can read the keys on the source."), lager:info("Verify we can read the keys on the source."),
rt:read_from_cluster(AFirst, Start, End, ?TEST_BUCKET, 0), repl_util:read_from_cluster(AFirst, Start, End, ?TEST_BUCKET, 0),
lager:info("Performing sacrifice."), lager:info("Performing sacrifice."),
perform_sacrifice(AFirst, Start), perform_sacrifice(AFirst, Start),
rt:validate_completed_fullsync(LeaderA, BFirst, "B", repl_util:validate_completed_fullsync(LeaderA, BFirst, "B", Start, End, ?TEST_BUCKET),
Start, End, ?TEST_BUCKET),
rt:clean_cluster(Nodes). rt:clean_cluster(Nodes).