mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 16:45:29 +00:00
Merge branch 'bugfix/replication-test-failures'
This commit is contained in:
commit
abad8c44ff
@ -13,10 +13,6 @@
|
||||
-export([make_bucket/3]).
|
||||
|
||||
confirm() ->
|
||||
NumNodes = rt_config:get(num_nodes, 6),
|
||||
ClusterASize = rt_config:get(cluster_a_size, 3),
|
||||
|
||||
lager:info("Deploy ~p nodes", [NumNodes]),
|
||||
Conf = [
|
||||
{riak_repl,
|
||||
[
|
||||
@ -25,21 +21,8 @@ confirm() ->
|
||||
{diff_batch_size, 10}
|
||||
]}
|
||||
],
|
||||
|
||||
Nodes = deploy_nodes(NumNodes, Conf),
|
||||
|
||||
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
|
||||
lager:info("ANodes: ~p", [ANodes]),
|
||||
lager:info("BNodes: ~p", [BNodes]),
|
||||
|
||||
lager:info("Build cluster A"),
|
||||
rt:log_to_nodes(Nodes, "Build cluster A"),
|
||||
repl_util:make_cluster(ANodes),
|
||||
|
||||
lager:info("Build cluster B"),
|
||||
rt:log_to_nodes(Nodes, "Build cluster B"),
|
||||
repl_util:make_cluster(BNodes),
|
||||
|
||||
rt:set_advanced_conf(all, Conf),
|
||||
[ANodes, BNodes] = rt:build_clusters([3, 3]),
|
||||
replication(ANodes, BNodes, false),
|
||||
pass.
|
||||
|
||||
@ -221,6 +204,8 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
|
||||
lager:info("Restarting down node ~p", [LeaderA]),
|
||||
rt:start(LeaderA),
|
||||
rt:wait_until_pingable(LeaderA),
|
||||
wait_until_no_pending_changes(ANodes),
|
||||
wait_until_leader_converge(ANodes),
|
||||
start_and_wait_until_fullsync_complete(LeaderA2),
|
||||
|
||||
case nodes_all_have_version(ANodes, "1.2.2") of
|
||||
@ -522,6 +507,9 @@ make_bucket([Node|_]=Nodes, Name, Args) ->
|
||||
?assertEqual(ok, Res).
|
||||
|
||||
start_and_wait_until_fullsync_complete(Node) ->
|
||||
start_and_wait_until_fullsync_complete(Node, 20).
|
||||
|
||||
start_and_wait_until_fullsync_complete(Node, Retries) ->
|
||||
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||
Count = proplists:get_value(server_fullsyncs, Status0) + 1,
|
||||
lager:info("waiting for fullsync count to be ~p", [Count]),
|
||||
@ -529,36 +517,38 @@ start_and_wait_until_fullsync_complete(Node) ->
|
||||
lager:info("Starting fullsync on ~p (~p)", [Node,
|
||||
rtdev:node_version(rtdev:node_id(Node))]),
|
||||
rpc:call(Node, riak_repl_console, start_fullsync, [[]]),
|
||||
|
||||
%% sleep because of the old bug where stats will crash if you call it too
|
||||
%% soon after starting a fullsync
|
||||
timer:sleep(500),
|
||||
|
||||
Res = rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||
case proplists:get_value(server_fullsyncs, Status) of
|
||||
C when C >= Count ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end),
|
||||
case node_has_version(Node, "1.2.0") of
|
||||
true ->
|
||||
?assertEqual(ok, Res);
|
||||
_ ->
|
||||
case Res of
|
||||
ok ->
|
||||
ok;
|
||||
_ ->
|
||||
?assertEqual(ok, wait_until_connection(Node)),
|
||||
lager:warning("Pre 1.2.0 node failed to fullsync, retrying"),
|
||||
start_and_wait_until_fullsync_complete(Node)
|
||||
end
|
||||
case rt:wait_until(make_fullsync_wait_fun(Node, Count), 100, 1000) of
|
||||
ok ->
|
||||
ok;
|
||||
_ when Retries > 0 ->
|
||||
?assertEqual(ok, wait_until_connection(Node)),
|
||||
lager:warning("Node failed to fullsync, retrying"),
|
||||
rpc:call(Node, riak_repl_console, cancel_fullsync, [[]]),
|
||||
start_and_wait_until_fullsync_complete(Node, Retries-1)
|
||||
end,
|
||||
|
||||
lager:info("Fullsync on ~p complete", [Node]).
|
||||
|
||||
make_fullsync_wait_fun(Node, Count) ->
|
||||
fun() ->
|
||||
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||
case Status of
|
||||
{badrpc, _} ->
|
||||
false;
|
||||
_ ->
|
||||
case proplists:get_value(server_fullsyncs, Status) of
|
||||
C when C >= Count ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
wait_until_is_leader(Node) ->
|
||||
lager:info("wait_until_is_leader(~p)", [Node]),
|
||||
rt:wait_until(Node, fun is_leader/1).
|
||||
@ -613,23 +603,39 @@ wait_until_new_leader(Node, OldLeader) ->
|
||||
wait_until_leader_converge([Node|_] = Nodes) ->
|
||||
rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
length(lists:usort([begin
|
||||
case rpc:call(N, riak_repl_console, status, [quiet]) of
|
||||
{badrpc, _} ->
|
||||
false;
|
||||
Status ->
|
||||
case proplists:get_value(leader, Status) of
|
||||
undefined ->
|
||||
false;
|
||||
L ->
|
||||
%lager:info("Leader for ~p is ~p",
|
||||
%[N,L]),
|
||||
L
|
||||
end
|
||||
end
|
||||
end || N <- Nodes])) == 1
|
||||
LeaderResults =
|
||||
[get_leader(rpc:call(N, riak_repl_console, status, [quiet])) ||
|
||||
N <- Nodes],
|
||||
{Leaders, Errors} =
|
||||
lists:partition(leader_result_filter_fun(), LeaderResults),
|
||||
UniqueLeaders = lists:usort(Leaders),
|
||||
Errors == [] andalso length(UniqueLeaders) == 1
|
||||
end).
|
||||
|
||||
get_leader({badrpc, _}=Err) ->
|
||||
Err;
|
||||
get_leader(Status) ->
|
||||
case proplists:get_value(leader, Status) of
|
||||
undefined ->
|
||||
false;
|
||||
L ->
|
||||
%%lager:info("Leader for ~p is ~p",
|
||||
%%[N,L]),
|
||||
L
|
||||
end.
|
||||
|
||||
leader_result_filter_fun() ->
|
||||
fun(L) ->
|
||||
case L of
|
||||
undefined ->
|
||||
false;
|
||||
{badrpc, _} ->
|
||||
false;
|
||||
_ ->
|
||||
true
|
||||
end
|
||||
end.
|
||||
|
||||
wait_until_connection(Node) ->
|
||||
rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
@ -695,4 +701,3 @@ do_write(Node, Start, End, Bucket, W) ->
|
||||
lists:flatten([rt:systest_write(Node, S, S, Bucket, W) ||
|
||||
{S, _Error} <- Errors])
|
||||
end.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user