Merge pull request #480 from basho/bug/rdb/sibling-explosion

Add tests for sibling explosion resolution with DVV
This commit is contained in:
Russell Brown 2013-12-20 07:30:42 -08:00
commit 59fe416eab
2 changed files with 214 additions and 0 deletions

View File

@ -0,0 +1,89 @@
-module(sibling_explosion).
-include_lib("eunit/include/eunit.hrl").
-export([confirm/0]).
-compile(export_all).
-define(B, <<"b">>).
-define(K, <<"k">>).
%% This tests provokes a sibling explosion. It does so with a single
%% node and a single client. All it does to acheive the explosion is
%% interleave fetch / resolve / writes. It works like this:
%% - The aim of the clients is to write a single set of intergers from 0-99
%% - one client gets odds, the other evens
%% - The test interleaves writes from the clients
%% - Each client, in turn:
%% -- fetchs the key from riak
%% -- resolves the sibling values (by performing a set union on the values in all siblings)
%% -- adds a new value to the set
%% -- Puts the new value back to riak
%% This results in 99 siblings, each a subset of the following sibling [0] | [0, 1] | [0, 1, 2], [0, 1, 2, 3] etc
confirm() ->
Conf = [{riak_core, [{default_bucket_props, [{allow_mult, true}]}]}],
[Node1] = rt:deploy_nodes(1, Conf),
N = 100,
lager:info("Put new object in ~p via PBC.", [Node1]),
PB = rt:pbc(Node1),
A0 = riakc_obj:new(<<"b">>, <<"k">>, sets:from_list([0])),
B0 = riakc_obj:new(<<"b">>, <<"k">>, sets:from_list([1])),
_ = explode(PB, {A0, B0}, N),
{ok, SibCheck1} = riakc_pb_socket:get(PB, <<"b">>, <<"k">>),
%% there should now be only two siblings
?assertEqual(2, riakc_obj:value_count(SibCheck1)),
%% siblings should merge to include all writes
assert_sibling_values(riakc_obj:get_values(SibCheck1), N),
pass.
%% Check that everywrite was wrote.
assert_sibling_values(Values, N) ->
V = resolve(Values, sets:new()),
L = lists:sort(sets:to_list(V)),
Expected = lists:seq(0, N-2),
?assertEqual(Expected, L).
%% Pick one of the two objects, and perform a fetch, resolve, update
%% cycle with it. The point is that the two object's writes are
%% interleaved. First A is updated, then B. Each object already has a
%% "latest" vclock returned from it's last put. This simulates the
%% case where a client fetches a vclock before PUT, but another write
%% lands at the vnode after the vclock is returned and before the next
%% PUT. Each PUT sees all but one write that Riak as seen, meaning
%% there is a perpetual race / sibling. Without DVV that means ALL
%% writes are added to the sibling set. With DVV, we correctly capture
%% the resolution of seen writes.
explode(_PB, {A, B}, 1) ->
{A, B};
explode(PB, {A0, B}, Cnt) when Cnt rem 2 == 0 ->
A = resolve_mutate_store(PB, Cnt, A0),
explode(PB, {A, B}, Cnt-1);
explode(PB, {A, B0}, Cnt) when Cnt rem 2 /= 0 ->
B = resolve_mutate_store(PB, Cnt, B0),
explode(PB, {A, B}, Cnt-1).
%% resolve the fetch, and put a new value
resolve_mutate_store(PB, N, Obj0) ->
Obj = resolve_update(Obj0, N),
{ok, Obj2} = riakc_pb_socket:put(PB, Obj, [return_body]),
Obj2.
%% simply union the values, and add a new one for this operation
resolve_update(Obj, N) ->
case riakc_obj:get_values(Obj) of
[] -> Obj;
Values ->
Value0 = resolve(Values, sets:new()),
Value = sets:add_element(N, Value0),
lager:info("Storing ~p", [N]),
riakc_obj:update_metadata(riakc_obj:update_value(Obj, Value), dict:new())
end.
%% Set union on each value
resolve([], Acc) ->
Acc;
resolve([V0 | Rest], Acc) ->
V = binary_to_term(V0),
resolve(Rest, sets:union(V, Acc)).

125
tests/verify_dvv_repl.erl Normal file
View File

@ -0,0 +1,125 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2012 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% @copyright (C) 2013, Basho Technologies
%%% @doc
%%% riak_test repl caused sibling explosion
%%% Encodes scenario as described to me in hipchat.
%%% @end
-module(verify_dvv_repl).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(BUCKET, <<"dvv-repl-bucket">>).
-define(KEY, <<"dvv-repl-key">>).
confirm() ->
inets:start(),
{{ClientA, ClusterA}, {ClientB, ClusterB}} = make_clusters(),
%% Write data to both clusters
write_object([ClientA, ClientB]),
%% Connect for real time repl A->B
connect_realtime(ClusterA, ClusterB),
%% Update ClusterA 100 times
[write_object(ClientA) || _ <- lists:seq(1, 100)],
%% Get the object, and see it has 100 siblings (not the two it should have)
AObj = get_object(ClientA),
BObj = get_object(ClientB),
?assertEqual(1, riakc_obj:value_count(AObj)),
?assertEqual(2, riakc_obj:value_count(BObj)),
pass.
make_clusters() ->
Conf = [{riak_repl, [{fullsync_on_connect, false},
{fullsync_interval, disabled}]}],
Nodes = rt:deploy_nodes(6, Conf),
{ClusterA, ClusterB} = lists:split(3, Nodes),
A = make_cluster(ClusterA, "A"),
B = make_cluster(ClusterB, "B"),
{A, B}.
make_cluster(Nodes, Name) ->
repl_util:make_cluster(Nodes),
repl_util:name_cluster(hd(Nodes), Name),
repl_util:wait_until_leader_converge(Nodes),
C = rt:pbc(hd(Nodes)),
riakc_pb_socket:set_options(C, [queue_if_disconnected]),
{C, Nodes}.
write_object([]) ->
ok;
write_object([Client | Rest]) ->
ok = write_object(Client),
write_object(Rest);
write_object(Client) ->
fetch_resolve_write(Client).
get_object(Client) ->
case riakc_pb_socket:get(Client, ?BUCKET, ?KEY) of
{ok, Obj} ->
Obj;
_ ->
riakc_obj:new(?BUCKET, ?KEY)
end.
fetch_resolve_write(Client) ->
Obj = get_object(Client),
Value = resolve_update(riakc_obj:get_values(Obj)),
Obj3 = riakc_obj:update_metadata(riakc_obj:update_value(Obj, Value), dict:new()),
ok = riakc_pb_socket:put(Client, Obj3).
resolve_update([]) ->
sets:add_element(1, sets:new());
resolve_update(Values) ->
Resolved = lists:foldl(fun(V0, Acc) ->
V = binary_to_term(V0),
sets:union(V, Acc)
end,
sets:new(),
Values),
NewValue = lists:max(sets:to_list(Resolved)),
sets:add_element(NewValue, Resolved).
%% Set up one way RT repl
connect_realtime(ClusterA, ClusterB) ->
lager:info("repl power...ACTIVATE!"),
LeaderA = get_leader(hd(ClusterA)),
MgrPortB = get_mgr_port(hd(ClusterB)),
repl_util:connect_cluster(LeaderA, "127.0.0.1", MgrPortB),
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
repl_util:enable_realtime(LeaderA, "B"),
repl_util:start_realtime(LeaderA, "B").
get_leader(Node) ->
rpc:call(Node, riak_core_cluster_mgr, get_leader, []).
get_mgr_port(Node) ->
{ok, {_IP, Port}} = rpc:call(Node, application, get_env,
[riak_core, cluster_mgr]),
Port.