riak_test/tests/verify_dvv_repl.erl
Engel A. Sanchez 0328b4e7d7 Fix failure on slow replication
This changes the test assertion so that it retries fetching the value
from the second cluster until it is the expected value, at which point
the test will either pass if the sibling count is reasonable or fail if
it is too damn high.
2014-12-23 16:41:59 -05:00

171 lines
5.6 KiB
Erlang

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% @copyright (C) 2014, Basho Technologies
%%% @doc
%%% riak_test repl caused sibling explosion Encodes scenario as
%%% described to me in hipchat. Write something to cluster B, enable
%%% realtime repl from A to B, read and write object, with resolution
%%% to A 100 times. Without DVV you have 100 siblings on B, with, you
%%% have 2 (the original B write, and the converged A writes)
%%% @end
-module(verify_dvv_repl).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(BUCKET, <<"dvv-repl-bucket">>).
-define(KEY, <<"dvv-repl-key">>).
-define(KEY2, <<"dvv-repl-key2">>).
confirm() ->
inets:start(),
{{ClientA, ClusterA}, {ClientB, ClusterB}} = make_clusters(),
%% Write data to B
write_object(ClientB),
%% Connect for real time repl A->B
connect_realtime(ClusterA, ClusterB),
IsReplicating = make_replicate_test_fun(ClientA, ClientB),
rt:wait_until(IsReplicating),
%% Update ClusterA 100 times
[write_object(ClientA) || _ <- lists:seq(1, 100)],
%% Get the object, and see if it has 100 siblings (not the two it
%% should have.) Turn off DVV in `make_cluster` and see the
%% siblings explode!
AObj = get_object(ClientA),
Expected = lists:seq(1, 100),
%% Having up to 3 siblings could happen in rare cases when the writes hit
%% different nodes concurrently in the n_val=3 preflist.
?assertMatch(Count when Count =< 3, riakc_obj:value_count(AObj)),
WaitFun = fun() ->
lager:info("Checking sink object"),
BObj = get_object(ClientB),
Resolved0 = resolve(riakc_obj:get_values(BObj)),
Resolved = lists:sort(sets:to_list(Resolved0)),
case Resolved of
Expected ->
BCount = riakc_obj:value_count(BObj),
?assertMatch(C when C =< 6, BCount),
true;
_ ->
false
end
end,
?assertEqual(ok, rt:wait_until(WaitFun)),
pass.
make_replicate_test_fun(From, To) ->
fun() ->
Obj = riakc_obj:new(?BUCKET, ?KEY2, <<"am I replicated yet?">>),
ok = riakc_pb_socket:put(From, Obj),
case riakc_pb_socket:get(To, ?BUCKET, ?KEY2) of
{ok, _} ->
true;
{error, notfound} ->
false
end
end.
make_clusters() ->
Conf = [{riak_repl, [{fullsync_on_connect, false},
{fullsync_interval, disabled}]},
{riak_core, [{default_bucket_props,
[{dvv_enabled, true},
{allow_mult, true}]}]}],
Nodes = rt:deploy_nodes(6, Conf, [riak_kv, riak_repl]),
{ClusterA, ClusterB} = lists:split(3, Nodes),
A = make_cluster(ClusterA, "A"),
B = make_cluster(ClusterB, "B"),
{A, B}.
make_cluster(Nodes, Name) ->
repl_util:make_cluster(Nodes),
repl_util:name_cluster(hd(Nodes), Name),
repl_util:wait_until_leader_converge(Nodes),
C = rt:pbc(hd(Nodes)),
riakc_pb_socket:set_options(C, [queue_if_disconnected]),
{C, Nodes}.
write_object([]) ->
ok;
write_object([Client | Rest]) ->
ok = write_object(Client),
write_object(Rest);
write_object(Client) ->
fetch_resolve_write(Client).
get_object(Client) ->
case riakc_pb_socket:get(Client, ?BUCKET, ?KEY) of
{ok, Obj} ->
Obj;
_ ->
riakc_obj:new(?BUCKET, ?KEY)
end.
fetch_resolve_write(Client) ->
Obj = get_object(Client),
Value = resolve_update(riakc_obj:get_values(Obj)),
Obj3 = riakc_obj:update_metadata(riakc_obj:update_value(Obj, Value), dict:new()),
ok = riakc_pb_socket:put(Client, Obj3).
resolve(Values) ->
lists:foldl(fun(V0, Acc) ->
V = binary_to_term(V0),
sets:union(V, Acc)
end,
sets:new(),
Values).
resolve_update([]) ->
sets:add_element(1, sets:new());
resolve_update(Values) ->
Resolved = resolve(Values),
NewValue = lists:max(sets:to_list(Resolved)) + 1,
sets:add_element(NewValue, Resolved).
%% Set up one way RT repl
connect_realtime(ClusterA, ClusterB) ->
lager:info("repl power...ACTIVATE!"),
LeaderA = get_leader(hd(ClusterA)),
MgrPortB = get_mgr_port(hd(ClusterB)),
repl_util:connect_cluster(LeaderA, "127.0.0.1", MgrPortB),
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
repl_util:enable_realtime(LeaderA, "B"),
repl_util:start_realtime(LeaderA, "B").
get_leader(Node) ->
rpc:call(Node, riak_core_cluster_mgr, get_leader, []).
get_mgr_port(Node) ->
{ok, {_IP, Port}} = rpc:call(Node, application, get_env,
[riak_core, cluster_mgr]),
Port.