diff --git a/intercepts/riak_kv_bitcask_backend_intercepts.erl b/intercepts/riak_kv_bitcask_backend_intercepts.erl index dc42cbeb..a5cc31b9 100644 --- a/intercepts/riak_kv_bitcask_backend_intercepts.erl +++ b/intercepts/riak_kv_bitcask_backend_intercepts.erl @@ -48,3 +48,11 @@ corrupting_get(Bucket, Key, ModState) -> corrupt_binary(O) -> crypto:rand_bytes(byte_size(O)). + +always_corrupt_get(Bucket, Key, ModState) -> + case ?M:get_orig(Bucket, Key, ModState) of + {ok, BinVal0, UpdModState} -> + BinVal = corrupt_binary(BinVal0), + {ok, BinVal, UpdModState}; + Else -> Else + end. diff --git a/src/rt.erl b/src/rt.erl index 507fa50f..1cc104f2 100644 --- a/src/rt.erl +++ b/src/rt.erl @@ -561,6 +561,20 @@ heal({_NewCookie, OldCookie, P1, P2}) -> {_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast), ok. +%% @doc heal the partition created by call to partition/2, but if some +%% node in P1 is down, just skip it, rather than failing. Returns {ok, +%% list(node())} where the list is those nodes down and therefore not +%% healed/reconnected. +heal_upnodes({_NewCookie, OldCookie, P1, P2}) -> + %% set OldCookie on UP P1 Nodes + Res = [{N, rpc:call(N, erlang, set_cookie, [N, OldCookie])} || N <- P1], + UpForReconnect = [N || {N, true} <- Res], + DownForReconnect = [N || {N, RPC} <- Res, RPC /= true], + Cluster = UpForReconnect ++ P2, + wait_until_connected(Cluster), + {_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast), + {ok, DownForReconnect}. + %% @doc Spawn `Cmd' on the machine running the test harness spawn_cmd(Cmd) -> ?HARNESS:spawn_cmd(Cmd). @@ -778,6 +792,17 @@ wait_until_transfers_complete([Node0|_]) -> ?assertEqual(ok, wait_until(Node0, F)), ok. +%% @doc Waits until hinted handoffs from `Node0' are complete +wait_until_node_handoffs_complete(Node0) -> + lager:info("Wait until Node's transfers complete ~p", [Node0]), + F = fun(Node) -> + Handoffs = rpc:call(Node, riak_core_handoff_manager, status, [{direction, outbound}]), + lager:info("Handoffs: ~p", [Handoffs]), + Handoffs =:= [] + end, + ?assertEqual(ok, wait_until(Node0, F)), + ok. + wait_for_service(Node, Services) when is_list(Services) -> F = fun(N) -> case rpc:call(N, riak_core_node_watcher, services, [N]) of diff --git a/tests/kv679_dataloss_fb.erl b/tests/kv679_dataloss_fb.erl index 872f17ce..c709e85f 100644 --- a/tests/kv679_dataloss_fb.erl +++ b/tests/kv679_dataloss_fb.erl @@ -92,10 +92,7 @@ confirm() -> %% Write key twice at remaining, coordinating primary kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"jim">>]), - kv679_tombstone2:dump_clock(CoordClient), - - lager:info("Clock at 2 fallbacks"), %% Kill the fallbacks before they can handoff @@ -106,21 +103,16 @@ confirm() -> %% Bring back the primaries and do some more writes [rt:start_and_wait(P) || P <- OtherPrimaries], - lager:info("started primaries back up"), - rt:wait_until(fun() -> NewPL = kv679_tombstone:get_preflist(CoordNode), NewPL == PL end), - kv679_tombstone:write_key(CoordClient, [<<"jon">>, <<"joe">>]), - kv679_tombstone2:dump_clock(CoordClient), %% Kill those primaries with their frontier clocks [rt:brutal_kill(P) || P <- OtherPrimaries], - lager:info("killed primaries again"), %% delete the local data at the coordinator Key @@ -128,9 +120,7 @@ confirm() -> %% Start up those fallbacks [rt:start_and_wait(F) || F <- Fallbacks], - lager:info("restart fallbacks"), - %% Wait for the fallback prefist rt:wait_until(fun() -> NewPL = kv679_tombstone:get_preflist(CoordNode), @@ -139,38 +129,24 @@ confirm() -> %% Read the key, read repair will mean that the data deleted vnode %% will have an old clock (gone back in time!) - await_read_repair(CoordClient), - kv679_tombstone2:dump_clock(CoordClient), %% write a new value, this _should_ be a sibling of what is on %% crashed primaries - - lager:info("writing anne value", []), - - %% @TODO why is this write calling put new object?? - kv679_tombstone:write_key(CoordClient, <<"anne">>), - - lager:info("dumping new vclock"), - kv679_tombstone2:dump_clock(CoordClient), %% Time to start up those primaries, let handoff happen, and see %% what happened to that last write - [rt:start_and_wait(P) || P <- OtherPrimaries], - lager:info("restart primaries _again_"), - rt:wait_until(fun() -> NewPL = kv679_tombstone:get_preflist(CoordNode), NewPL == PL end), lager:info("wait for handoffs"), - [begin rpc:call(FB, riak_core_vnode_manager, force_handoffs, []), rt:wait_until_transfers_complete([FB]) @@ -179,16 +155,13 @@ confirm() -> lager:info("final get"), Res = kv679_tombstone:read_key(CoordClient), - ?assertMatch({ok, _}, Res), {ok, O} = Res, %% A nice riak would have somehow managed to make a sibling of the %% last write ?assertEqual([<<"anne">>, <<"joe">>], riakc_obj:get_values(O)), - lager:info("Final Object ~p~n", [O]), - pass. primary_and_fallback_counts(PL) -> diff --git a/tests/kv679_dataloss_fb2.erl b/tests/kv679_dataloss_fb2.erl new file mode 100644 index 00000000..245fdf7d --- /dev/null +++ b/tests/kv679_dataloss_fb2.erl @@ -0,0 +1,249 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2017 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%%% @copyright (C) 2017, Basho Technologies +%%% @doc +%%% riak_test for kv679 lost clock/fallback/handoff flavour. +%%% +%%% issue kv679 is a possible dataloss issue, it's basically caused by +%%% the fact that per key logical clocks can go backwards in time in +%%% certain situations. The situation under test here is as follows: +%%% +%% write at P1 replicates to F1 clock is at [{p1, 1}] at p1, f1 +%% write twice more at P1, replciates to F2, clock is at [{p1, 3}] at p1, f2 +%% write at p2 replicates to f3, clock is at [{p2, 1}] at p2, f3 +%% handoff from f3->p1, during which the local read at p1 fails (disk error, whatever) +%% clock at p1,p2 is [{p2, 1}] f3 deletes after handoff (no more f3) +%% handoff from f1->p2 clock at p2 [{p1, 1}, {p2, 1}] +%% read repair from p2 -> p1, clock at p1 [{p1, 1}, {p2, 1}] +%% write on p1 replicate to p2, clock at [{p1, 2}, {p2, 1}] at p1, p2 +%% handoff f2->p2, merge causes last write to be lost since entry {p1, 3} > {p1, 2} +%% read repair p2->p1 and the acked write is silently lost forever. +%%% +%%% +%% NOTE this failure occurs even in riak2.1 with actor-epochs, since +%% it is caused by a local-not-found on a non-coordinating write. EQC +%% found this issue, the counter example is below. +%%% +%%% +%% [{set,{var,1},{call,kv679_eqc,put,[p1,p2,<<"A">>,959494,undefined,false]}}, +%% {set,{var,3},{call,kv679_eqc,put,[p3,p2,<<"A">>,960608,{var,2},false]}}, +%% {set,{var,5},{call,kv679_eqc,put,[p1,f3,<<"A">>,960760,{var,4},false]}}, +%% {set,{var,6},{call,kv679_eqc,put,[p1,f1,<<"A">>,960851,{var,4},false]}}, +%% {set,{var,7},{call,kv679_eqc,forget,[p1,<<"A">>]}}, +%% {set,{var,8},{call,kv679_eqc,replicate,[<<"A">>,p3,p1]}}, +%% {set,{var,9},{call,kv679_eqc,replicate,[<<"A">>,f3,p1]}}, +%% {set,{var,11},{call,kv679_eqc,put,[p1,p2,<<"A">>,962942,{var,10},false]}}] +%% +%%% +%%% +%%% @end + +-module(kv679_dataloss_fb2). +-behavior(riak_test). +-compile([export_all]). +-export([confirm/0]). +-import(kv679_dataloss_fb, [primary_and_fallback_counts/1]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(BUCKET, <<"kv679">>). +-define(KEY, <<"test">>). +-define(NVAL, 2). + +confirm() -> + Conf = [ + {riak_kv, [{anti_entropy, {off, []}}]}, + {riak_core, [{default_bucket_props, [ + %% note reduced n_val + %% is to reduce + %% number of nodes + %% required for test + %% case and to + %% simplify case. + {n_val, ?NVAL}, + {allow_mult, true}, + {dvv_enabled, true}, + {ring_creation_size, 8}, + {vnode_management_timer, 1000}, + {handoff_concurrency, 100}, + {vnode_inactivity_timeout, 1000}]}]}, + {bitcask, [{sync_strategy, o_sync}, {io_mode, nif}]}], + + Nodes = rt:build_cluster(5, Conf), + Clients = kv679_tombstone:create_pb_clients(Nodes), + {_, TempPBC} = hd(Clients), + rt:pbc_set_bucket_prop(TempPBC, ?BUCKET, [{n_val, ?NVAL}]), + rt:wait_until_bucket_props(Nodes, ?BUCKET, [{n_val, ?NVAL}]), + + %% Get preflist for key + PL = kv679_tombstone:get_preflist(hd(Nodes), ?NVAL), + ?assert(kv679_tombstone2:perfect_preflist(PL, ?NVAL)), + + lager:info("Got preflist ~p", [PL]), + + {CoordNode, _}=CoordClient = kv679_tombstone:coordinating_client(Clients, PL), + OtherPrimaries = [Node || {{_Idx, Node}, Type} <- PL, + Type == primary, + Node /= CoordNode], + Fallbacks = [FB || FB <- Nodes, FB /= CoordNode andalso not lists:member(FB, OtherPrimaries)], + + ?assert(length(Fallbacks) == 3), + + lager:info("fallbacks ~p, primaries ~p~n", [Fallbacks, [CoordNode] ++ OtherPrimaries]), + + %% Partition the other primary and one non-preflist node + {_, _, _P1, P2} = PartInfo = rt:partition([CoordNode] ++ tl(Fallbacks), OtherPrimaries ++ [hd(Fallbacks)]), + lager:info("Partitioned ~p~n", [PartInfo]), + + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("new PL ~p~n", [NewPL]), + primary_and_fallback_counts(NewPL) == {1, 1} + end), + + FBPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("Got a preflist with coord and 1 fb ~p~n", [FBPL]), + + %% Write key once at coordinating primary + kv679_tombstone:write_key(CoordClient, [<<"alice">>]), + kv679_tombstone2:dump_clock(CoordClient), + lager:info("Clock at fallback"), + + %% Kill the fallback before it can handoff + [FB1] = [Node || {{_Idx, Node}, Type} <- FBPL, + Type == fallback], + rt:brutal_kill(FB1), + + %% get a new preflist with a different fallback + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + primary_and_fallback_counts(NewPL) == {1, 1} andalso NewPL /= FBPL + end), + FBPL2 = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("Got a preflist with coord and 1 fb ~p~n", [FBPL2]), + ?assert(FBPL2 /= FBPL), + + %% do two more writes so that there exists out there a clock of + %% {Primary1, 1} on the first fallback node and {Primary1, 3} on + %% the second + kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"charlie">>]), + kv679_tombstone2:dump_clock(CoordClient), + lager:info("Clock at fallback2"), + + %% Kill the fallback before it can handoff + [FB2] = [Node || {{_Idx, Node}, Type} <- FBPL2, + Type == fallback], + rt:brutal_kill(FB2), + + %% meanwhile, in the other partition, let's write some data + P2PL = kv679_tombstone:get_preflist(hd(P2), ?NVAL), + lager:info("partition 2 PL ~p", [P2PL]), + [P2FB] = [Node || {{_Idx, Node}, Type} <- P2PL, + Type == fallback], + [P2P] = [Node || {{_Idx, Node}, Type} <- P2PL, + Type == primary], + P2Client = kv679_tombstone:coordinating_client(Clients, P2PL), + kv679_tombstone:write_key(P2Client, [<<"dave">>]), + kv679_tombstone2:dump_clock(P2Client), + lager:info("Clock in Partition 2"), + + %% set up a local read error on Primary 1 (this is any disk error, + %% delete the datadir, an intercept for a local crc failure or + %% deserialisation error) + rt_intercept:add(CoordNode, {riak_kv_bitcask_backend, [{{get, 3}, always_corrupt_get}]}), + + %% heal the partition, remember the Partition 1 fallbacks aren't + %% handing off yet (we killed them, but it could be any delay in + %% handoff, not a failure, killing them is just a way to control + %% timing (maybe should've used intercepts??)) + {ok, DownNodes} = rt:heal_upnodes(PartInfo), + lager:info("These nodes still need healing ~p", [DownNodes]), + + %% wait for the partition 2 fallback to hand off + rpc:call(P2FB, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_node_handoffs_complete(P2FB), + + %% what's the clock on primary 1 now? NOTE: this extra read was + %% the only way I could ensure that the handoff had occured before + %% cleaning out the intercept. A sleep also worked. + kv679_tombstone2:dump_clock(CoordClient), + rt_intercept:clean(CoordNode, riak_kv_bitcask_backend), + kv679_tombstone2:dump_clock(CoordClient), + + %% restart fallback one and wait for it to handoff + rt:start_and_wait(FB1), + lager:info("started fallback 1 back up"), + rpc:call(FB1, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_node_handoffs_complete(FB1), + + %% kill dev5 again (there is something very weird here, unless I + %% do this dev3 _never_ hands off (and since n=2 it is never used + %% in a preflist for read repair)) So the only way to get it's + %% data onto CoordNode is to ensure a preflist of FB1 and + %% CoordNode for a read! To be fair this rather ruins the test + %% case. @TODO investigate this handoff problem + rt:stop_and_wait(P2P), + rt:stop_and_wait(P2FB), + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("new PL ~p~n", [NewPL]), + primary_and_fallback_counts(NewPL) == {1, 1} andalso + different_nodes(NewPL) + end), + + %% %% what's the clock on primary 1 now? + kv679_tombstone2:dump_clock(CoordClient), + + %% restart P2P and P2FB + rt:start_and_wait(P2P), + rt:start_and_wait(P2FB), + + %% get a primary PL + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("new PL ~p~n", [NewPL]), + primary_and_fallback_counts(NewPL) == {2, 0} andalso + different_nodes(NewPL) + end), + + %% write a new value + kv679_tombstone:write_key(CoordClient, [<<"emma">>]), + kv679_tombstone2:dump_clock(CoordClient), + + %% finally start the last offline fallback, await all transfers, + %% and do a read, the last write, `emma' should be present + rt:start_and_wait(FB2), + rpc:call(FB2, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_transfers_complete(Nodes), + + lager:info("final get"), + Res = kv679_tombstone:read_key(CoordClient), + ?assertMatch({ok, _}, Res), + {ok, O} = Res, + + %% A nice riak would have somehow managed to make a sibling of the + %% last acked write, even with all the craziness + ?assertEqual([<<"charlie">>, <<"emma">>], lists:sort(riakc_obj:get_values(O))), + lager:info("Final Object ~p~n", [O]), + pass. + +different_nodes(PL) -> + Nodes = [Node || {{_, Node}, _} <- PL], + length(lists:usort(Nodes)) == length(PL). diff --git a/tests/kv679_tombstone.erl b/tests/kv679_tombstone.erl index 300a0eb4..e96956b5 100644 --- a/tests/kv679_tombstone.erl +++ b/tests/kv679_tombstone.erl @@ -137,7 +137,8 @@ write_key({_, Client}, Val, Opts) when is_binary(Val) -> Object = case riakc_pb_socket:get(Client, ?BUCKET, ?KEY, []) of {ok, O1} -> lager:info("writing existing!"), - riakc_obj:update_value(O1, Val); + O2 = riakc_obj:update_metadata(O1, dict:new()), + riakc_obj:update_value(O2, Val); _ -> lager:info("writing new!"), riakc_obj:new(?BUCKET, ?KEY, Val) @@ -182,9 +183,12 @@ start_node(Node, Preflist) -> wait_for_new_pl(Preflist, Node). get_preflist(Node) -> + get_preflist(Node, 3). + +get_preflist(Node, NVal) -> Chash = rpc:call(Node, riak_core_util, chash_key, [{?BUCKET, ?KEY}]), UpNodes = rpc:call(Node, riak_core_node_watcher, nodes, [riak_kv]), - PL = rpc:call(Node, riak_core_apl, get_apl_ann, [Chash, 3, UpNodes]), + PL = rpc:call(Node, riak_core_apl, get_apl_ann, [Chash, NVal, UpNodes]), PL. kill_primary(Preflist) -> diff --git a/tests/kv679_tombstone2.erl b/tests/kv679_tombstone2.erl index 3135717e..22d43205 100644 --- a/tests/kv679_tombstone2.erl +++ b/tests/kv679_tombstone2.erl @@ -128,9 +128,12 @@ confirm() -> perfect_preflist(PL) -> - %% N=3 primaries, each on a unique node + perfect_preflist(PL, 3). + +perfect_preflist(PL, NVal) -> + %% N=NVal primaries, each on a unique node length(lists:usort([Node || {{_Idx, Node}, Type} <- PL, - Type == primary])) == 3. + Type == primary])) == NVal. get_coord_client_and_patsy(Clients, PL) -> {CoordNode, _}=CoordClient=kv679_tombstone:coordinating_client(Clients, PL),