mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 16:45:29 +00:00
Fix several bugs in the test refactor, and adjust some assumptions
This commit is contained in:
parent
931abb7823
commit
ab59896b24
@ -14,7 +14,7 @@
|
|||||||
-define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>).
|
-define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>).
|
||||||
-define(NUM_KEYS, 1000).
|
-define(NUM_KEYS, 1000).
|
||||||
|
|
||||||
-define(CONF, [
|
-define(CONF(Retries), [
|
||||||
{riak_core,
|
{riak_core,
|
||||||
[
|
[
|
||||||
{ring_creation_size, 8},
|
{ring_creation_size, 8},
|
||||||
@ -34,7 +34,7 @@
|
|||||||
{fullsync_strategy, aae},
|
{fullsync_strategy, aae},
|
||||||
{fullsync_on_connect, false},
|
{fullsync_on_connect, false},
|
||||||
{fullsync_interval, disabled},
|
{fullsync_interval, disabled},
|
||||||
{max_fssource_retries, 5}
|
{max_fssource_retries, Retries}
|
||||||
]}
|
]}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
@ -45,7 +45,7 @@ confirm() ->
|
|||||||
|
|
||||||
simple_test() ->
|
simple_test() ->
|
||||||
%% Deploy 6 nodes.
|
%% Deploy 6 nodes.
|
||||||
Nodes = deploy_nodes(6, ?CONF),
|
Nodes = deploy_nodes(6, ?CONF(5)),
|
||||||
|
|
||||||
%% Break up the 6 nodes into three clustes.
|
%% Break up the 6 nodes into three clustes.
|
||||||
{ANodes, BNodes} = lists:split(3, Nodes),
|
{ANodes, BNodes} = lists:split(3, Nodes),
|
||||||
@ -107,11 +107,13 @@ simple_test() ->
|
|||||||
%% intercepts are removed.
|
%% intercepts are removed.
|
||||||
validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS),
|
validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS),
|
||||||
|
|
||||||
|
rt:clean_cluster(Nodes),
|
||||||
|
|
||||||
pass.
|
pass.
|
||||||
|
|
||||||
exhaustive_test() ->
|
exhaustive_test() ->
|
||||||
%% Deploy 6 nodes.
|
%% Deploy 6 nodes.
|
||||||
Nodes = deploy_nodes(6, ?CONF),
|
Nodes = deploy_nodes(6, ?CONF(infinity)),
|
||||||
|
|
||||||
%% Break up the 6 nodes into three clustes.
|
%% Break up the 6 nodes into three clustes.
|
||||||
{ANodes, Rest} = lists:split(2, Nodes),
|
{ANodes, Rest} = lists:split(2, Nodes),
|
||||||
@ -172,24 +174,42 @@ exhaustive_test() ->
|
|||||||
repl_util:enable_fullsync(LeaderA, "C"),
|
repl_util:enable_fullsync(LeaderA, "C"),
|
||||||
rt:wait_until_ring_converged(ANodes),
|
rt:wait_until_ring_converged(ANodes),
|
||||||
|
|
||||||
|
%% Wait for trees to compute.
|
||||||
|
repl_util:wait_until_aae_trees_built(ANodes),
|
||||||
|
repl_util:wait_until_aae_trees_built(BNodes),
|
||||||
|
repl_util:wait_until_aae_trees_built(CNodes),
|
||||||
|
|
||||||
%% Flush AAE trees to disk.
|
%% Flush AAE trees to disk.
|
||||||
perform_sacrifice(AFirst),
|
perform_sacrifice(AFirst),
|
||||||
|
|
||||||
%% Verify data is replicated from A -> B successfully once the
|
%% Verify data is replicated from A -> B successfully
|
||||||
%% intercepts are removed.
|
|
||||||
validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS),
|
validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS),
|
||||||
|
|
||||||
%% Verify data is replicated from A -> B successfully once the
|
%% Verify data is replicated from A -> C successfully
|
||||||
%% intercepts are removed.
|
|
||||||
validate_completed_fullsync(LeaderA, CFirst, "C", 1, ?NUM_KEYS),
|
validate_completed_fullsync(LeaderA, CFirst, "C", 1, ?NUM_KEYS),
|
||||||
|
|
||||||
|
|
||||||
|
write_to_cluster(AFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS),
|
||||||
|
read_from_cluster(BFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, ?NUM_KEYS),
|
||||||
|
read_from_cluster(CFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, ?NUM_KEYS),
|
||||||
|
|
||||||
|
|
||||||
|
%% Verify that duelling fullsyncs eventually complete
|
||||||
|
{Time, _} = timer:tc(repl_util,
|
||||||
|
start_and_wait_until_fullsync_complete,
|
||||||
|
[LeaderA]),
|
||||||
|
|
||||||
|
read_from_cluster(BFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, 0),
|
||||||
|
read_from_cluster(CFirst, ?NUM_KEYS, ?NUM_KEYS + ?NUM_KEYS, 0),
|
||||||
|
lager:info("Fullsync A->B and A->C completed in ~p seconds", [Time/1000/1000]),
|
||||||
|
|
||||||
pass.
|
pass.
|
||||||
|
|
||||||
%% @doc Required for 1.4+ Riak, write sacrificial keys to force AAE
|
%% @doc Required for 1.4+ Riak, write sacrificial keys to force AAE
|
||||||
%% trees to flush to disk.
|
%% trees to flush to disk.
|
||||||
perform_sacrifice(Node) ->
|
perform_sacrifice(Node) ->
|
||||||
?assertEqual([], repl_util:do_write(Node, 1, 2000,
|
?assertEqual([], repl_util:do_write(Node, 1, 2000,
|
||||||
<<"scarificial">>, 1)).
|
<<"sacrificial">>, 1)).
|
||||||
|
|
||||||
%% @doc Validate fullsync completed and all keys are available.
|
%% @doc Validate fullsync completed and all keys are available.
|
||||||
validate_completed_fullsync(ReplicationLeader,
|
validate_completed_fullsync(ReplicationLeader,
|
||||||
@ -198,8 +218,9 @@ validate_completed_fullsync(ReplicationLeader,
|
|||||||
Start,
|
Start,
|
||||||
End) ->
|
End) ->
|
||||||
ok = check_fullsync(ReplicationLeader, DestinationCluster, 0),
|
ok = check_fullsync(ReplicationLeader, DestinationCluster, 0),
|
||||||
lager:info("Verify: Reading ~p keys repl'd from A(~p) to B(~p)",
|
lager:info("Verify: Reading ~p keys repl'd from A(~p) to ~p(~p)",
|
||||||
[?NUM_KEYS, ReplicationLeader, DestinationNode]),
|
[?NUM_KEYS, ReplicationLeader,
|
||||||
|
DestinationCluster, DestinationNode]),
|
||||||
?assertEqual(0,
|
?assertEqual(0,
|
||||||
repl_util:wait_for_reads(DestinationNode,
|
repl_util:wait_for_reads(DestinationNode,
|
||||||
Start,
|
Start,
|
||||||
@ -212,7 +233,7 @@ validate_completed_fullsync(ReplicationLeader,
|
|||||||
check_fullsync(Node, Cluster, ExpectedFailures) ->
|
check_fullsync(Node, Cluster, ExpectedFailures) ->
|
||||||
{Time, _} = timer:tc(repl_util,
|
{Time, _} = timer:tc(repl_util,
|
||||||
start_and_wait_until_fullsync_complete,
|
start_and_wait_until_fullsync_complete,
|
||||||
[Node]),
|
[Node, Cluster]),
|
||||||
lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]),
|
lager:info("Fullsync completed in ~p seconds", [Time/1000/1000]),
|
||||||
|
|
||||||
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||||
@ -334,5 +355,5 @@ write_to_cluster(Node, Start, End) ->
|
|||||||
%% of errors.
|
%% of errors.
|
||||||
read_from_cluster(Node, Start, End, Errors) ->
|
read_from_cluster(Node, Start, End, Errors) ->
|
||||||
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
|
lager:info("Reading ~p keys from node ~p.", [End - Start, Node]),
|
||||||
Res2 = rt:systest_read(Node, Start, ?NUM_KEYS, ?TEST_BUCKET, 1),
|
Res2 = rt:systest_read(Node, Start, End, ?TEST_BUCKET, 1),
|
||||||
?assertEqual(Errors, length(Res2)).
|
?assertEqual(Errors, length(Res2)).
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
wait_until_aae_trees_built/1,
|
wait_until_aae_trees_built/1,
|
||||||
wait_for_reads/5,
|
wait_for_reads/5,
|
||||||
start_and_wait_until_fullsync_complete/1,
|
start_and_wait_until_fullsync_complete/1,
|
||||||
|
start_and_wait_until_fullsync_complete/2,
|
||||||
connect_cluster/3,
|
connect_cluster/3,
|
||||||
disconnect_cluster/2,
|
disconnect_cluster/2,
|
||||||
wait_for_connection/2,
|
wait_for_connection/2,
|
||||||
@ -152,13 +153,30 @@ get_fs_coord_status_item(Node, SinkName, ItemName) ->
|
|||||||
proplists:get_value(ItemName, ClusterProps).
|
proplists:get_value(ItemName, ClusterProps).
|
||||||
|
|
||||||
start_and_wait_until_fullsync_complete(Node) ->
|
start_and_wait_until_fullsync_complete(Node) ->
|
||||||
|
start_and_wait_until_fullsync_complete(Node, undefined).
|
||||||
|
|
||||||
|
start_and_wait_until_fullsync_complete(Node, Cluster) ->
|
||||||
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
|
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||||
Count = proplists:get_value(server_fullsyncs, Status0) + 1,
|
Count0 = proplists:get_value(server_fullsyncs, Status0),
|
||||||
|
Count = case Cluster of
|
||||||
|
undefined ->
|
||||||
|
%% count the # of fullsync enabled clusters
|
||||||
|
Count0 + length(string:tokens(proplists:get_value(fullsync_enabled,
|
||||||
|
Status0), ", "));
|
||||||
|
_ ->
|
||||||
|
Count0 + 1
|
||||||
|
end,
|
||||||
lager:info("waiting for fullsync count to be ~p", [Count]),
|
lager:info("waiting for fullsync count to be ~p", [Count]),
|
||||||
|
|
||||||
lager:info("Starting fullsync on ~p (~p)", [Node,
|
lager:info("Starting fullsync on ~p (~p)", [Node,
|
||||||
rtdev:node_version(rtdev:node_id(Node))]),
|
rtdev:node_version(rtdev:node_id(Node))]),
|
||||||
rpc:call(Node, riak_repl_console, fullsync, [["start"]]),
|
Args = case Cluster of
|
||||||
|
undefined ->
|
||||||
|
["start"];
|
||||||
|
_ ->
|
||||||
|
["start", Cluster]
|
||||||
|
end,
|
||||||
|
rpc:call(Node, riak_repl_console, fullsync, [Args]),
|
||||||
%% sleep because of the old bug where stats will crash if you call it too
|
%% sleep because of the old bug where stats will crash if you call it too
|
||||||
%% soon after starting a fullsync
|
%% soon after starting a fullsync
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
|
Loading…
Reference in New Issue
Block a user