Add test for further dataloss edge case

This test shows a dataloss edge case in riak, even in 2.1 with
per-key-actor-epochs enabled. The test is a litte convoluted, and is
based on a quickcheck counter example, included in the riak_kv/test
directory. In short, both this test, and the other kv679_dataloss_fb
test, show that even with multiple replicas acking/storing, a single
disk error on a single replica is enough to cause acked writes to be
silently and permanently lost. For a replicated database, that is bad.
This commit is contained in:
Russell Brown 2017-03-20 18:03:01 +00:00
parent 3ae320c18e
commit 0176494723
6 changed files with 293 additions and 31 deletions

View File

@ -48,3 +48,11 @@ corrupting_get(Bucket, Key, ModState) ->
corrupt_binary(O) ->
crypto:rand_bytes(byte_size(O)).
always_corrupt_get(Bucket, Key, ModState) ->
case ?M:get_orig(Bucket, Key, ModState) of
{ok, BinVal0, UpdModState} ->
BinVal = corrupt_binary(BinVal0),
{ok, BinVal, UpdModState};
Else -> Else
end.

View File

@ -561,6 +561,20 @@ heal({_NewCookie, OldCookie, P1, P2}) ->
{_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast),
ok.
%% @doc heal the partition created by call to partition/2, but if some
%% node in P1 is down, just skip it, rather than failing. Returns {ok,
%% list(node())} where the list is those nodes down and therefore not
%% healed/reconnected.
heal_upnodes({_NewCookie, OldCookie, P1, P2}) ->
%% set OldCookie on UP P1 Nodes
Res = [{N, rpc:call(N, erlang, set_cookie, [N, OldCookie])} || N <- P1],
UpForReconnect = [N || {N, true} <- Res],
DownForReconnect = [N || {N, RPC} <- Res, RPC /= true],
Cluster = UpForReconnect ++ P2,
wait_until_connected(Cluster),
{_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast),
{ok, DownForReconnect}.
%% @doc Spawn `Cmd' on the machine running the test harness
spawn_cmd(Cmd) ->
?HARNESS:spawn_cmd(Cmd).
@ -778,6 +792,17 @@ wait_until_transfers_complete([Node0|_]) ->
?assertEqual(ok, wait_until(Node0, F)),
ok.
%% @doc Waits until hinted handoffs from `Node0' are complete
wait_until_node_handoffs_complete(Node0) ->
lager:info("Wait until Node's transfers complete ~p", [Node0]),
F = fun(Node) ->
Handoffs = rpc:call(Node, riak_core_handoff_manager, status, [{direction, outbound}]),
lager:info("Handoffs: ~p", [Handoffs]),
Handoffs =:= []
end,
?assertEqual(ok, wait_until(Node0, F)),
ok.
wait_for_service(Node, Services) when is_list(Services) ->
F = fun(N) ->
case rpc:call(N, riak_core_node_watcher, services, [N]) of

View File

@ -92,10 +92,7 @@ confirm() ->
%% Write key twice at remaining, coordinating primary
kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"jim">>]),
kv679_tombstone2:dump_clock(CoordClient),
lager:info("Clock at 2 fallbacks"),
%% Kill the fallbacks before they can handoff
@ -106,21 +103,16 @@ confirm() ->
%% Bring back the primaries and do some more writes
[rt:start_and_wait(P) || P <- OtherPrimaries],
lager:info("started primaries back up"),
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
NewPL == PL
end),
kv679_tombstone:write_key(CoordClient, [<<"jon">>, <<"joe">>]),
kv679_tombstone2:dump_clock(CoordClient),
%% Kill those primaries with their frontier clocks
[rt:brutal_kill(P) || P <- OtherPrimaries],
lager:info("killed primaries again"),
%% delete the local data at the coordinator Key
@ -128,9 +120,7 @@ confirm() ->
%% Start up those fallbacks
[rt:start_and_wait(F) || F <- Fallbacks],
lager:info("restart fallbacks"),
%% Wait for the fallback prefist
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
@ -139,38 +129,24 @@ confirm() ->
%% Read the key, read repair will mean that the data deleted vnode
%% will have an old clock (gone back in time!)
await_read_repair(CoordClient),
kv679_tombstone2:dump_clock(CoordClient),
%% write a new value, this _should_ be a sibling of what is on
%% crashed primaries
lager:info("writing anne value", []),
%% @TODO why is this write calling put new object??
kv679_tombstone:write_key(CoordClient, <<"anne">>),
lager:info("dumping new vclock"),
kv679_tombstone2:dump_clock(CoordClient),
%% Time to start up those primaries, let handoff happen, and see
%% what happened to that last write
[rt:start_and_wait(P) || P <- OtherPrimaries],
lager:info("restart primaries _again_"),
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
NewPL == PL
end),
lager:info("wait for handoffs"),
[begin
rpc:call(FB, riak_core_vnode_manager, force_handoffs, []),
rt:wait_until_transfers_complete([FB])
@ -179,16 +155,13 @@ confirm() ->
lager:info("final get"),
Res = kv679_tombstone:read_key(CoordClient),
?assertMatch({ok, _}, Res),
{ok, O} = Res,
%% A nice riak would have somehow managed to make a sibling of the
%% last write
?assertEqual([<<"anne">>, <<"joe">>], riakc_obj:get_values(O)),
lager:info("Final Object ~p~n", [O]),
pass.
primary_and_fallback_counts(PL) ->

View File

@ -0,0 +1,249 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2017 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% @copyright (C) 2017, Basho Technologies
%%% @doc
%%% riak_test for kv679 lost clock/fallback/handoff flavour.
%%%
%%% issue kv679 is a possible dataloss issue, it's basically caused by
%%% the fact that per key logical clocks can go backwards in time in
%%% certain situations. The situation under test here is as follows:
%%%
%% write at P1 replicates to F1 clock is at [{p1, 1}] at p1, f1
%% write twice more at P1, replciates to F2, clock is at [{p1, 3}] at p1, f2
%% write at p2 replicates to f3, clock is at [{p2, 1}] at p2, f3
%% handoff from f3->p1, during which the local read at p1 fails (disk error, whatever)
%% clock at p1,p2 is [{p2, 1}] f3 deletes after handoff (no more f3)
%% handoff from f1->p2 clock at p2 [{p1, 1}, {p2, 1}]
%% read repair from p2 -> p1, clock at p1 [{p1, 1}, {p2, 1}]
%% write on p1 replicate to p2, clock at [{p1, 2}, {p2, 1}] at p1, p2
%% handoff f2->p2, merge causes last write to be lost since entry {p1, 3} > {p1, 2}
%% read repair p2->p1 and the acked write is silently lost forever.
%%%
%%%
%% NOTE this failure occurs even in riak2.1 with actor-epochs, since
%% it is caused by a local-not-found on a non-coordinating write. EQC
%% found this issue, the counter example is below.
%%%
%%%
%% [{set,{var,1},{call,kv679_eqc,put,[p1,p2,<<"A">>,959494,undefined,false]}},
%% {set,{var,3},{call,kv679_eqc,put,[p3,p2,<<"A">>,960608,{var,2},false]}},
%% {set,{var,5},{call,kv679_eqc,put,[p1,f3,<<"A">>,960760,{var,4},false]}},
%% {set,{var,6},{call,kv679_eqc,put,[p1,f1,<<"A">>,960851,{var,4},false]}},
%% {set,{var,7},{call,kv679_eqc,forget,[p1,<<"A">>]}},
%% {set,{var,8},{call,kv679_eqc,replicate,[<<"A">>,p3,p1]}},
%% {set,{var,9},{call,kv679_eqc,replicate,[<<"A">>,f3,p1]}},
%% {set,{var,11},{call,kv679_eqc,put,[p1,p2,<<"A">>,962942,{var,10},false]}}]
%%
%%%
%%%
%%% @end
-module(kv679_dataloss_fb2).
-behavior(riak_test).
-compile([export_all]).
-export([confirm/0]).
-import(kv679_dataloss_fb, [primary_and_fallback_counts/1]).
-include_lib("eunit/include/eunit.hrl").
-define(BUCKET, <<"kv679">>).
-define(KEY, <<"test">>).
-define(NVAL, 2).
confirm() ->
Conf = [
{riak_kv, [{anti_entropy, {off, []}}]},
{riak_core, [{default_bucket_props, [
%% note reduced n_val
%% is to reduce
%% number of nodes
%% required for test
%% case and to
%% simplify case.
{n_val, ?NVAL},
{allow_mult, true},
{dvv_enabled, true},
{ring_creation_size, 8},
{vnode_management_timer, 1000},
{handoff_concurrency, 100},
{vnode_inactivity_timeout, 1000}]}]},
{bitcask, [{sync_strategy, o_sync}, {io_mode, nif}]}],
Nodes = rt:build_cluster(5, Conf),
Clients = kv679_tombstone:create_pb_clients(Nodes),
{_, TempPBC} = hd(Clients),
rt:pbc_set_bucket_prop(TempPBC, ?BUCKET, [{n_val, ?NVAL}]),
rt:wait_until_bucket_props(Nodes, ?BUCKET, [{n_val, ?NVAL}]),
%% Get preflist for key
PL = kv679_tombstone:get_preflist(hd(Nodes), ?NVAL),
?assert(kv679_tombstone2:perfect_preflist(PL, ?NVAL)),
lager:info("Got preflist ~p", [PL]),
{CoordNode, _}=CoordClient = kv679_tombstone:coordinating_client(Clients, PL),
OtherPrimaries = [Node || {{_Idx, Node}, Type} <- PL,
Type == primary,
Node /= CoordNode],
Fallbacks = [FB || FB <- Nodes, FB /= CoordNode andalso not lists:member(FB, OtherPrimaries)],
?assert(length(Fallbacks) == 3),
lager:info("fallbacks ~p, primaries ~p~n", [Fallbacks, [CoordNode] ++ OtherPrimaries]),
%% Partition the other primary and one non-preflist node
{_, _, _P1, P2} = PartInfo = rt:partition([CoordNode] ++ tl(Fallbacks), OtherPrimaries ++ [hd(Fallbacks)]),
lager:info("Partitioned ~p~n", [PartInfo]),
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL),
lager:info("new PL ~p~n", [NewPL]),
primary_and_fallback_counts(NewPL) == {1, 1}
end),
FBPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL),
lager:info("Got a preflist with coord and 1 fb ~p~n", [FBPL]),
%% Write key once at coordinating primary
kv679_tombstone:write_key(CoordClient, [<<"alice">>]),
kv679_tombstone2:dump_clock(CoordClient),
lager:info("Clock at fallback"),
%% Kill the fallback before it can handoff
[FB1] = [Node || {{_Idx, Node}, Type} <- FBPL,
Type == fallback],
rt:brutal_kill(FB1),
%% get a new preflist with a different fallback
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL),
primary_and_fallback_counts(NewPL) == {1, 1} andalso NewPL /= FBPL
end),
FBPL2 = kv679_tombstone:get_preflist(CoordNode, ?NVAL),
lager:info("Got a preflist with coord and 1 fb ~p~n", [FBPL2]),
?assert(FBPL2 /= FBPL),
%% do two more writes so that there exists out there a clock of
%% {Primary1, 1} on the first fallback node and {Primary1, 3} on
%% the second
kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"charlie">>]),
kv679_tombstone2:dump_clock(CoordClient),
lager:info("Clock at fallback2"),
%% Kill the fallback before it can handoff
[FB2] = [Node || {{_Idx, Node}, Type} <- FBPL2,
Type == fallback],
rt:brutal_kill(FB2),
%% meanwhile, in the other partition, let's write some data
P2PL = kv679_tombstone:get_preflist(hd(P2), ?NVAL),
lager:info("partition 2 PL ~p", [P2PL]),
[P2FB] = [Node || {{_Idx, Node}, Type} <- P2PL,
Type == fallback],
[P2P] = [Node || {{_Idx, Node}, Type} <- P2PL,
Type == primary],
P2Client = kv679_tombstone:coordinating_client(Clients, P2PL),
kv679_tombstone:write_key(P2Client, [<<"dave">>]),
kv679_tombstone2:dump_clock(P2Client),
lager:info("Clock in Partition 2"),
%% set up a local read error on Primary 1 (this is any disk error,
%% delete the datadir, an intercept for a local crc failure or
%% deserialisation error)
rt_intercept:add(CoordNode, {riak_kv_bitcask_backend, [{{get, 3}, always_corrupt_get}]}),
%% heal the partition, remember the Partition 1 fallbacks aren't
%% handing off yet (we killed them, but it could be any delay in
%% handoff, not a failure, killing them is just a way to control
%% timing (maybe should've used intercepts??))
{ok, DownNodes} = rt:heal_upnodes(PartInfo),
lager:info("These nodes still need healing ~p", [DownNodes]),
%% wait for the partition 2 fallback to hand off
rpc:call(P2FB, riak_core_vnode_manager, force_handoffs, []),
rt:wait_until_node_handoffs_complete(P2FB),
%% what's the clock on primary 1 now? NOTE: this extra read was
%% the only way I could ensure that the handoff had occured before
%% cleaning out the intercept. A sleep also worked.
kv679_tombstone2:dump_clock(CoordClient),
rt_intercept:clean(CoordNode, riak_kv_bitcask_backend),
kv679_tombstone2:dump_clock(CoordClient),
%% restart fallback one and wait for it to handoff
rt:start_and_wait(FB1),
lager:info("started fallback 1 back up"),
rpc:call(FB1, riak_core_vnode_manager, force_handoffs, []),
rt:wait_until_node_handoffs_complete(FB1),
%% kill dev5 again (there is something very weird here, unless I
%% do this dev3 _never_ hands off (and since n=2 it is never used
%% in a preflist for read repair)) So the only way to get it's
%% data onto CoordNode is to ensure a preflist of FB1 and
%% CoordNode for a read! To be fair this rather ruins the test
%% case. @TODO investigate this handoff problem
rt:stop_and_wait(P2P),
rt:stop_and_wait(P2FB),
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL),
lager:info("new PL ~p~n", [NewPL]),
primary_and_fallback_counts(NewPL) == {1, 1} andalso
different_nodes(NewPL)
end),
%% %% what's the clock on primary 1 now?
kv679_tombstone2:dump_clock(CoordClient),
%% restart P2P and P2FB
rt:start_and_wait(P2P),
rt:start_and_wait(P2FB),
%% get a primary PL
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL),
lager:info("new PL ~p~n", [NewPL]),
primary_and_fallback_counts(NewPL) == {2, 0} andalso
different_nodes(NewPL)
end),
%% write a new value
kv679_tombstone:write_key(CoordClient, [<<"emma">>]),
kv679_tombstone2:dump_clock(CoordClient),
%% finally start the last offline fallback, await all transfers,
%% and do a read, the last write, `emma' should be present
rt:start_and_wait(FB2),
rpc:call(FB2, riak_core_vnode_manager, force_handoffs, []),
rt:wait_until_transfers_complete(Nodes),
lager:info("final get"),
Res = kv679_tombstone:read_key(CoordClient),
?assertMatch({ok, _}, Res),
{ok, O} = Res,
%% A nice riak would have somehow managed to make a sibling of the
%% last acked write, even with all the craziness
?assertEqual([<<"charlie">>, <<"emma">>], lists:sort(riakc_obj:get_values(O))),
lager:info("Final Object ~p~n", [O]),
pass.
different_nodes(PL) ->
Nodes = [Node || {{_, Node}, _} <- PL],
length(lists:usort(Nodes)) == length(PL).

View File

@ -137,7 +137,8 @@ write_key({_, Client}, Val, Opts) when is_binary(Val) ->
Object = case riakc_pb_socket:get(Client, ?BUCKET, ?KEY, []) of
{ok, O1} ->
lager:info("writing existing!"),
riakc_obj:update_value(O1, Val);
O2 = riakc_obj:update_metadata(O1, dict:new()),
riakc_obj:update_value(O2, Val);
_ ->
lager:info("writing new!"),
riakc_obj:new(?BUCKET, ?KEY, Val)
@ -182,9 +183,12 @@ start_node(Node, Preflist) ->
wait_for_new_pl(Preflist, Node).
get_preflist(Node) ->
get_preflist(Node, 3).
get_preflist(Node, NVal) ->
Chash = rpc:call(Node, riak_core_util, chash_key, [{?BUCKET, ?KEY}]),
UpNodes = rpc:call(Node, riak_core_node_watcher, nodes, [riak_kv]),
PL = rpc:call(Node, riak_core_apl, get_apl_ann, [Chash, 3, UpNodes]),
PL = rpc:call(Node, riak_core_apl, get_apl_ann, [Chash, NVal, UpNodes]),
PL.
kill_primary(Preflist) ->

View File

@ -128,9 +128,12 @@ confirm() ->
perfect_preflist(PL) ->
%% N=3 primaries, each on a unique node
perfect_preflist(PL, 3).
perfect_preflist(PL, NVal) ->
%% N=NVal primaries, each on a unique node
length(lists:usort([Node || {{_Idx, Node}, Type} <- PL,
Type == primary])) == 3.
Type == primary])) == NVal.
get_coord_client_and_patsy(Clients, PL) ->
{CoordNode, _}=CoordClient=kv679_tombstone:coordinating_client(Clients, PL),