From 3ae320c18eed3057cf8b3f7c0456d8b021036a30 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Tue, 7 Mar 2017 15:04:30 +0000 Subject: [PATCH 1/3] Re-add old kv679 test This test was part of the original kv679 suite, but at the time of 2.1 it was put in it's own branch. Riak Test is made up of passing tests, and this test (kv679_dataloss_fb.erl) still fails. Adding this back to a branch off develop in preparation for a fix of this issue. --- tests/kv679_dataloss_fb.erl | 212 ++++++++++++++++++++++++++++++++++++ tests/kv679_tombstone.erl | 7 +- 2 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 tests/kv679_dataloss_fb.erl diff --git a/tests/kv679_dataloss_fb.erl b/tests/kv679_dataloss_fb.erl new file mode 100644 index 00000000..872f17ce --- /dev/null +++ b/tests/kv679_dataloss_fb.erl @@ -0,0 +1,212 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2017 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%%% @copyright (C) 2017, Basho Technologies +%%% @doc +%%% riak_test for kv679 lost clock/fallback/handoff flavour. +%%% +%%% issue kv679 is a possible dataloss issue, it's basically caused by +%%% the fact that per key logical clocks can go backwards in time in +%%% certain situations. The situation under test here is as follows: +%%% +%% A coords a write to K [{a, 1}] and replicates to fallbacks D, E +%% A coords a write to K [{a, 2}] and replicates to primaries B, C +%% A coords a write K [{a, 3}] and replicates to primaries B, C +%% A loses it's clock for K (so far this is like the lost clock case above) +%% Read of A, D, E read repairs A with K=[{a, 1}] +%% A coords a write, issues [{a, 2}] again +%% Acked write is lost +%%% +%%% +%%% @end + +-module(kv679_dataloss_fb). +-behavior(riak_test). +-compile([export_all]). +-export([confirm/0]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(BUCKET, <<"kv679">>). +-define(KEY, <<"test">>). + +confirm() -> + Conf = [ + {riak_kv, [{anti_entropy, {off, []}}]}, + {riak_core, [{default_bucket_props, [{allow_mult, true}, + {dvv_enabled, true}, + {ring_creation_size, 8}, + {vnode_management_timer, 1000}, + {handoff_concurrency, 100}, + {vnode_inactivity_timeout, 1000}]}]}, + {bitcask, [{sync_strategy, o_sync}, {io_mode, nif}]}], + + %% Such nodes 'cos I want a perfect preflist when 2 primaries are + %% down. i.e. I want to kill the fallbacks before they can + %% handoff without effecting the primaries + Nodes = rt:build_cluster(6, Conf), + + Clients = kv679_tombstone:create_pb_clients(Nodes), + + %% Get preflist for key + PL = kv679_tombstone:get_preflist(hd(Nodes)), + + ?assert(kv679_tombstone2:perfect_preflist(PL)), + + lager:info("Got preflist"), + + {CoordNode, _}=CoordClient = kv679_tombstone:coordinating_client(Clients, PL), + + OtherPrimaries = [Node || {{_Idx, Node}, Type} <- PL, + Type == primary, + Node /= CoordNode], + + [rt:stop_and_wait(N) || N <- OtherPrimaries], + + lager:info("Killed 2 primaries"), + + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode), + primary_and_fallback_counts(NewPL) == {1, 2} + end), + + FBPL = kv679_tombstone:get_preflist(CoordNode), + + lager:info("Got a preflist with coord and 2 fbs ~p~n", [FBPL]), + + %% Write key twice at remaining, coordinating primary + kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"jim">>]), + + kv679_tombstone2:dump_clock(CoordClient), + + + lager:info("Clock at 2 fallbacks"), + + %% Kill the fallbacks before they can handoff + Fallbacks = [Node || {{_Idx, Node}, Type} <- FBPL, + Type == fallback], + + [rt:brutal_kill(FB) || FB <- Fallbacks], + + %% Bring back the primaries and do some more writes + [rt:start_and_wait(P) || P <- OtherPrimaries], + + lager:info("started primaries back up"), + + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode), + NewPL == PL + end), + + kv679_tombstone:write_key(CoordClient, [<<"jon">>, <<"joe">>]), + + kv679_tombstone2:dump_clock(CoordClient), + + %% Kill those primaries with their frontier clocks + [rt:brutal_kill(P) || P <- OtherPrimaries], + + lager:info("killed primaries again"), + + %% delete the local data at the coordinator Key + kv679_dataloss:delete_datadir(hd(PL)), + + %% Start up those fallbacks + [rt:start_and_wait(F) || F <- Fallbacks], + + lager:info("restart fallbacks"), + + %% Wait for the fallback prefist + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode), + NewPL == FBPL + end), + + %% Read the key, read repair will mean that the data deleted vnode + %% will have an old clock (gone back in time!) + + await_read_repair(CoordClient), + + kv679_tombstone2:dump_clock(CoordClient), + + %% write a new value, this _should_ be a sibling of what is on + %% crashed primaries + + lager:info("writing anne value", []), + + %% @TODO why is this write calling put new object?? + + kv679_tombstone:write_key(CoordClient, <<"anne">>), + + lager:info("dumping new vclock"), + + kv679_tombstone2:dump_clock(CoordClient), + + %% Time to start up those primaries, let handoff happen, and see + %% what happened to that last write + + [rt:start_and_wait(P) || P <- OtherPrimaries], + + lager:info("restart primaries _again_"), + + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode), + NewPL == PL + end), + + lager:info("wait for handoffs"), + + [begin + rpc:call(FB, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_transfers_complete([FB]) + end || FB <- Fallbacks], + + lager:info("final get"), + + Res = kv679_tombstone:read_key(CoordClient), + + ?assertMatch({ok, _}, Res), + {ok, O} = Res, + + %% A nice riak would have somehow managed to make a sibling of the + %% last write + ?assertEqual([<<"anne">>, <<"joe">>], riakc_obj:get_values(O)), + + lager:info("Final Object ~p~n", [O]), + + pass. + +primary_and_fallback_counts(PL) -> + lists:foldl(fun({{_, _}, primary}, {P, F}) -> + {P+1, F}; + ({{_, _}, fallback}, {P, F}) -> + {P, F+1} + end, + {0, 0}, + PL). + +%% Wait until a read repair has occured, or at least, wait until there +%% is a value on disk at the coordinating/primary vnode (and assume it +%% must have got there via read repair) +await_read_repair(Client) -> + rt:wait_until(fun() -> + {ok, _O} = kv679_tombstone:read_key(Client), + {T, V} = kv679_tombstone:read_key(Client, [{pr,1},{r,1}, {sloppy_quorum, false}]), + lager:info("pr=1 fetch res ~p ~p", [T, V]), + T /= error + end). diff --git a/tests/kv679_tombstone.erl b/tests/kv679_tombstone.erl index 690e4570..300a0eb4 100644 --- a/tests/kv679_tombstone.erl +++ b/tests/kv679_tombstone.erl @@ -144,8 +144,11 @@ write_key({_, Client}, Val, Opts) when is_binary(Val) -> end, riakc_pb_socket:put(Client, Object, Opts). -read_key({_, Client}) -> - riakc_pb_socket:get(Client, ?BUCKET, ?KEY, []). +read_key({_, Client}, Opts) -> + riakc_pb_socket:get(Client, ?BUCKET, ?KEY, Opts). + +read_key(C) -> + read_key(C, []). delete_key({_, Client}) -> riakc_pb_socket:delete(Client, ?BUCKET, ?KEY), From 01764947236e46363afb14bb461f0646b6275edf Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Mon, 20 Mar 2017 18:03:01 +0000 Subject: [PATCH 2/3] Add test for further dataloss edge case This test shows a dataloss edge case in riak, even in 2.1 with per-key-actor-epochs enabled. The test is a litte convoluted, and is based on a quickcheck counter example, included in the riak_kv/test directory. In short, both this test, and the other kv679_dataloss_fb test, show that even with multiple replicas acking/storing, a single disk error on a single replica is enough to cause acked writes to be silently and permanently lost. For a replicated database, that is bad. --- .../riak_kv_bitcask_backend_intercepts.erl | 8 + src/rt.erl | 25 ++ tests/kv679_dataloss_fb.erl | 27 -- tests/kv679_dataloss_fb2.erl | 249 ++++++++++++++++++ tests/kv679_tombstone.erl | 8 +- tests/kv679_tombstone2.erl | 7 +- 6 files changed, 293 insertions(+), 31 deletions(-) create mode 100644 tests/kv679_dataloss_fb2.erl diff --git a/intercepts/riak_kv_bitcask_backend_intercepts.erl b/intercepts/riak_kv_bitcask_backend_intercepts.erl index dc42cbeb..a5cc31b9 100644 --- a/intercepts/riak_kv_bitcask_backend_intercepts.erl +++ b/intercepts/riak_kv_bitcask_backend_intercepts.erl @@ -48,3 +48,11 @@ corrupting_get(Bucket, Key, ModState) -> corrupt_binary(O) -> crypto:rand_bytes(byte_size(O)). + +always_corrupt_get(Bucket, Key, ModState) -> + case ?M:get_orig(Bucket, Key, ModState) of + {ok, BinVal0, UpdModState} -> + BinVal = corrupt_binary(BinVal0), + {ok, BinVal, UpdModState}; + Else -> Else + end. diff --git a/src/rt.erl b/src/rt.erl index 507fa50f..1cc104f2 100644 --- a/src/rt.erl +++ b/src/rt.erl @@ -561,6 +561,20 @@ heal({_NewCookie, OldCookie, P1, P2}) -> {_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast), ok. +%% @doc heal the partition created by call to partition/2, but if some +%% node in P1 is down, just skip it, rather than failing. Returns {ok, +%% list(node())} where the list is those nodes down and therefore not +%% healed/reconnected. +heal_upnodes({_NewCookie, OldCookie, P1, P2}) -> + %% set OldCookie on UP P1 Nodes + Res = [{N, rpc:call(N, erlang, set_cookie, [N, OldCookie])} || N <- P1], + UpForReconnect = [N || {N, true} <- Res], + DownForReconnect = [N || {N, RPC} <- Res, RPC /= true], + Cluster = UpForReconnect ++ P2, + wait_until_connected(Cluster), + {_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast), + {ok, DownForReconnect}. + %% @doc Spawn `Cmd' on the machine running the test harness spawn_cmd(Cmd) -> ?HARNESS:spawn_cmd(Cmd). @@ -778,6 +792,17 @@ wait_until_transfers_complete([Node0|_]) -> ?assertEqual(ok, wait_until(Node0, F)), ok. +%% @doc Waits until hinted handoffs from `Node0' are complete +wait_until_node_handoffs_complete(Node0) -> + lager:info("Wait until Node's transfers complete ~p", [Node0]), + F = fun(Node) -> + Handoffs = rpc:call(Node, riak_core_handoff_manager, status, [{direction, outbound}]), + lager:info("Handoffs: ~p", [Handoffs]), + Handoffs =:= [] + end, + ?assertEqual(ok, wait_until(Node0, F)), + ok. + wait_for_service(Node, Services) when is_list(Services) -> F = fun(N) -> case rpc:call(N, riak_core_node_watcher, services, [N]) of diff --git a/tests/kv679_dataloss_fb.erl b/tests/kv679_dataloss_fb.erl index 872f17ce..c709e85f 100644 --- a/tests/kv679_dataloss_fb.erl +++ b/tests/kv679_dataloss_fb.erl @@ -92,10 +92,7 @@ confirm() -> %% Write key twice at remaining, coordinating primary kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"jim">>]), - kv679_tombstone2:dump_clock(CoordClient), - - lager:info("Clock at 2 fallbacks"), %% Kill the fallbacks before they can handoff @@ -106,21 +103,16 @@ confirm() -> %% Bring back the primaries and do some more writes [rt:start_and_wait(P) || P <- OtherPrimaries], - lager:info("started primaries back up"), - rt:wait_until(fun() -> NewPL = kv679_tombstone:get_preflist(CoordNode), NewPL == PL end), - kv679_tombstone:write_key(CoordClient, [<<"jon">>, <<"joe">>]), - kv679_tombstone2:dump_clock(CoordClient), %% Kill those primaries with their frontier clocks [rt:brutal_kill(P) || P <- OtherPrimaries], - lager:info("killed primaries again"), %% delete the local data at the coordinator Key @@ -128,9 +120,7 @@ confirm() -> %% Start up those fallbacks [rt:start_and_wait(F) || F <- Fallbacks], - lager:info("restart fallbacks"), - %% Wait for the fallback prefist rt:wait_until(fun() -> NewPL = kv679_tombstone:get_preflist(CoordNode), @@ -139,38 +129,24 @@ confirm() -> %% Read the key, read repair will mean that the data deleted vnode %% will have an old clock (gone back in time!) - await_read_repair(CoordClient), - kv679_tombstone2:dump_clock(CoordClient), %% write a new value, this _should_ be a sibling of what is on %% crashed primaries - - lager:info("writing anne value", []), - - %% @TODO why is this write calling put new object?? - kv679_tombstone:write_key(CoordClient, <<"anne">>), - - lager:info("dumping new vclock"), - kv679_tombstone2:dump_clock(CoordClient), %% Time to start up those primaries, let handoff happen, and see %% what happened to that last write - [rt:start_and_wait(P) || P <- OtherPrimaries], - lager:info("restart primaries _again_"), - rt:wait_until(fun() -> NewPL = kv679_tombstone:get_preflist(CoordNode), NewPL == PL end), lager:info("wait for handoffs"), - [begin rpc:call(FB, riak_core_vnode_manager, force_handoffs, []), rt:wait_until_transfers_complete([FB]) @@ -179,16 +155,13 @@ confirm() -> lager:info("final get"), Res = kv679_tombstone:read_key(CoordClient), - ?assertMatch({ok, _}, Res), {ok, O} = Res, %% A nice riak would have somehow managed to make a sibling of the %% last write ?assertEqual([<<"anne">>, <<"joe">>], riakc_obj:get_values(O)), - lager:info("Final Object ~p~n", [O]), - pass. primary_and_fallback_counts(PL) -> diff --git a/tests/kv679_dataloss_fb2.erl b/tests/kv679_dataloss_fb2.erl new file mode 100644 index 00000000..245fdf7d --- /dev/null +++ b/tests/kv679_dataloss_fb2.erl @@ -0,0 +1,249 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2017 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%%% @copyright (C) 2017, Basho Technologies +%%% @doc +%%% riak_test for kv679 lost clock/fallback/handoff flavour. +%%% +%%% issue kv679 is a possible dataloss issue, it's basically caused by +%%% the fact that per key logical clocks can go backwards in time in +%%% certain situations. The situation under test here is as follows: +%%% +%% write at P1 replicates to F1 clock is at [{p1, 1}] at p1, f1 +%% write twice more at P1, replciates to F2, clock is at [{p1, 3}] at p1, f2 +%% write at p2 replicates to f3, clock is at [{p2, 1}] at p2, f3 +%% handoff from f3->p1, during which the local read at p1 fails (disk error, whatever) +%% clock at p1,p2 is [{p2, 1}] f3 deletes after handoff (no more f3) +%% handoff from f1->p2 clock at p2 [{p1, 1}, {p2, 1}] +%% read repair from p2 -> p1, clock at p1 [{p1, 1}, {p2, 1}] +%% write on p1 replicate to p2, clock at [{p1, 2}, {p2, 1}] at p1, p2 +%% handoff f2->p2, merge causes last write to be lost since entry {p1, 3} > {p1, 2} +%% read repair p2->p1 and the acked write is silently lost forever. +%%% +%%% +%% NOTE this failure occurs even in riak2.1 with actor-epochs, since +%% it is caused by a local-not-found on a non-coordinating write. EQC +%% found this issue, the counter example is below. +%%% +%%% +%% [{set,{var,1},{call,kv679_eqc,put,[p1,p2,<<"A">>,959494,undefined,false]}}, +%% {set,{var,3},{call,kv679_eqc,put,[p3,p2,<<"A">>,960608,{var,2},false]}}, +%% {set,{var,5},{call,kv679_eqc,put,[p1,f3,<<"A">>,960760,{var,4},false]}}, +%% {set,{var,6},{call,kv679_eqc,put,[p1,f1,<<"A">>,960851,{var,4},false]}}, +%% {set,{var,7},{call,kv679_eqc,forget,[p1,<<"A">>]}}, +%% {set,{var,8},{call,kv679_eqc,replicate,[<<"A">>,p3,p1]}}, +%% {set,{var,9},{call,kv679_eqc,replicate,[<<"A">>,f3,p1]}}, +%% {set,{var,11},{call,kv679_eqc,put,[p1,p2,<<"A">>,962942,{var,10},false]}}] +%% +%%% +%%% +%%% @end + +-module(kv679_dataloss_fb2). +-behavior(riak_test). +-compile([export_all]). +-export([confirm/0]). +-import(kv679_dataloss_fb, [primary_and_fallback_counts/1]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(BUCKET, <<"kv679">>). +-define(KEY, <<"test">>). +-define(NVAL, 2). + +confirm() -> + Conf = [ + {riak_kv, [{anti_entropy, {off, []}}]}, + {riak_core, [{default_bucket_props, [ + %% note reduced n_val + %% is to reduce + %% number of nodes + %% required for test + %% case and to + %% simplify case. + {n_val, ?NVAL}, + {allow_mult, true}, + {dvv_enabled, true}, + {ring_creation_size, 8}, + {vnode_management_timer, 1000}, + {handoff_concurrency, 100}, + {vnode_inactivity_timeout, 1000}]}]}, + {bitcask, [{sync_strategy, o_sync}, {io_mode, nif}]}], + + Nodes = rt:build_cluster(5, Conf), + Clients = kv679_tombstone:create_pb_clients(Nodes), + {_, TempPBC} = hd(Clients), + rt:pbc_set_bucket_prop(TempPBC, ?BUCKET, [{n_val, ?NVAL}]), + rt:wait_until_bucket_props(Nodes, ?BUCKET, [{n_val, ?NVAL}]), + + %% Get preflist for key + PL = kv679_tombstone:get_preflist(hd(Nodes), ?NVAL), + ?assert(kv679_tombstone2:perfect_preflist(PL, ?NVAL)), + + lager:info("Got preflist ~p", [PL]), + + {CoordNode, _}=CoordClient = kv679_tombstone:coordinating_client(Clients, PL), + OtherPrimaries = [Node || {{_Idx, Node}, Type} <- PL, + Type == primary, + Node /= CoordNode], + Fallbacks = [FB || FB <- Nodes, FB /= CoordNode andalso not lists:member(FB, OtherPrimaries)], + + ?assert(length(Fallbacks) == 3), + + lager:info("fallbacks ~p, primaries ~p~n", [Fallbacks, [CoordNode] ++ OtherPrimaries]), + + %% Partition the other primary and one non-preflist node + {_, _, _P1, P2} = PartInfo = rt:partition([CoordNode] ++ tl(Fallbacks), OtherPrimaries ++ [hd(Fallbacks)]), + lager:info("Partitioned ~p~n", [PartInfo]), + + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("new PL ~p~n", [NewPL]), + primary_and_fallback_counts(NewPL) == {1, 1} + end), + + FBPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("Got a preflist with coord and 1 fb ~p~n", [FBPL]), + + %% Write key once at coordinating primary + kv679_tombstone:write_key(CoordClient, [<<"alice">>]), + kv679_tombstone2:dump_clock(CoordClient), + lager:info("Clock at fallback"), + + %% Kill the fallback before it can handoff + [FB1] = [Node || {{_Idx, Node}, Type} <- FBPL, + Type == fallback], + rt:brutal_kill(FB1), + + %% get a new preflist with a different fallback + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + primary_and_fallback_counts(NewPL) == {1, 1} andalso NewPL /= FBPL + end), + FBPL2 = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("Got a preflist with coord and 1 fb ~p~n", [FBPL2]), + ?assert(FBPL2 /= FBPL), + + %% do two more writes so that there exists out there a clock of + %% {Primary1, 1} on the first fallback node and {Primary1, 3} on + %% the second + kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"charlie">>]), + kv679_tombstone2:dump_clock(CoordClient), + lager:info("Clock at fallback2"), + + %% Kill the fallback before it can handoff + [FB2] = [Node || {{_Idx, Node}, Type} <- FBPL2, + Type == fallback], + rt:brutal_kill(FB2), + + %% meanwhile, in the other partition, let's write some data + P2PL = kv679_tombstone:get_preflist(hd(P2), ?NVAL), + lager:info("partition 2 PL ~p", [P2PL]), + [P2FB] = [Node || {{_Idx, Node}, Type} <- P2PL, + Type == fallback], + [P2P] = [Node || {{_Idx, Node}, Type} <- P2PL, + Type == primary], + P2Client = kv679_tombstone:coordinating_client(Clients, P2PL), + kv679_tombstone:write_key(P2Client, [<<"dave">>]), + kv679_tombstone2:dump_clock(P2Client), + lager:info("Clock in Partition 2"), + + %% set up a local read error on Primary 1 (this is any disk error, + %% delete the datadir, an intercept for a local crc failure or + %% deserialisation error) + rt_intercept:add(CoordNode, {riak_kv_bitcask_backend, [{{get, 3}, always_corrupt_get}]}), + + %% heal the partition, remember the Partition 1 fallbacks aren't + %% handing off yet (we killed them, but it could be any delay in + %% handoff, not a failure, killing them is just a way to control + %% timing (maybe should've used intercepts??)) + {ok, DownNodes} = rt:heal_upnodes(PartInfo), + lager:info("These nodes still need healing ~p", [DownNodes]), + + %% wait for the partition 2 fallback to hand off + rpc:call(P2FB, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_node_handoffs_complete(P2FB), + + %% what's the clock on primary 1 now? NOTE: this extra read was + %% the only way I could ensure that the handoff had occured before + %% cleaning out the intercept. A sleep also worked. + kv679_tombstone2:dump_clock(CoordClient), + rt_intercept:clean(CoordNode, riak_kv_bitcask_backend), + kv679_tombstone2:dump_clock(CoordClient), + + %% restart fallback one and wait for it to handoff + rt:start_and_wait(FB1), + lager:info("started fallback 1 back up"), + rpc:call(FB1, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_node_handoffs_complete(FB1), + + %% kill dev5 again (there is something very weird here, unless I + %% do this dev3 _never_ hands off (and since n=2 it is never used + %% in a preflist for read repair)) So the only way to get it's + %% data onto CoordNode is to ensure a preflist of FB1 and + %% CoordNode for a read! To be fair this rather ruins the test + %% case. @TODO investigate this handoff problem + rt:stop_and_wait(P2P), + rt:stop_and_wait(P2FB), + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("new PL ~p~n", [NewPL]), + primary_and_fallback_counts(NewPL) == {1, 1} andalso + different_nodes(NewPL) + end), + + %% %% what's the clock on primary 1 now? + kv679_tombstone2:dump_clock(CoordClient), + + %% restart P2P and P2FB + rt:start_and_wait(P2P), + rt:start_and_wait(P2FB), + + %% get a primary PL + rt:wait_until(fun() -> + NewPL = kv679_tombstone:get_preflist(CoordNode, ?NVAL), + lager:info("new PL ~p~n", [NewPL]), + primary_and_fallback_counts(NewPL) == {2, 0} andalso + different_nodes(NewPL) + end), + + %% write a new value + kv679_tombstone:write_key(CoordClient, [<<"emma">>]), + kv679_tombstone2:dump_clock(CoordClient), + + %% finally start the last offline fallback, await all transfers, + %% and do a read, the last write, `emma' should be present + rt:start_and_wait(FB2), + rpc:call(FB2, riak_core_vnode_manager, force_handoffs, []), + rt:wait_until_transfers_complete(Nodes), + + lager:info("final get"), + Res = kv679_tombstone:read_key(CoordClient), + ?assertMatch({ok, _}, Res), + {ok, O} = Res, + + %% A nice riak would have somehow managed to make a sibling of the + %% last acked write, even with all the craziness + ?assertEqual([<<"charlie">>, <<"emma">>], lists:sort(riakc_obj:get_values(O))), + lager:info("Final Object ~p~n", [O]), + pass. + +different_nodes(PL) -> + Nodes = [Node || {{_, Node}, _} <- PL], + length(lists:usort(Nodes)) == length(PL). diff --git a/tests/kv679_tombstone.erl b/tests/kv679_tombstone.erl index 300a0eb4..e96956b5 100644 --- a/tests/kv679_tombstone.erl +++ b/tests/kv679_tombstone.erl @@ -137,7 +137,8 @@ write_key({_, Client}, Val, Opts) when is_binary(Val) -> Object = case riakc_pb_socket:get(Client, ?BUCKET, ?KEY, []) of {ok, O1} -> lager:info("writing existing!"), - riakc_obj:update_value(O1, Val); + O2 = riakc_obj:update_metadata(O1, dict:new()), + riakc_obj:update_value(O2, Val); _ -> lager:info("writing new!"), riakc_obj:new(?BUCKET, ?KEY, Val) @@ -182,9 +183,12 @@ start_node(Node, Preflist) -> wait_for_new_pl(Preflist, Node). get_preflist(Node) -> + get_preflist(Node, 3). + +get_preflist(Node, NVal) -> Chash = rpc:call(Node, riak_core_util, chash_key, [{?BUCKET, ?KEY}]), UpNodes = rpc:call(Node, riak_core_node_watcher, nodes, [riak_kv]), - PL = rpc:call(Node, riak_core_apl, get_apl_ann, [Chash, 3, UpNodes]), + PL = rpc:call(Node, riak_core_apl, get_apl_ann, [Chash, NVal, UpNodes]), PL. kill_primary(Preflist) -> diff --git a/tests/kv679_tombstone2.erl b/tests/kv679_tombstone2.erl index 3135717e..22d43205 100644 --- a/tests/kv679_tombstone2.erl +++ b/tests/kv679_tombstone2.erl @@ -128,9 +128,12 @@ confirm() -> perfect_preflist(PL) -> - %% N=3 primaries, each on a unique node + perfect_preflist(PL, 3). + +perfect_preflist(PL, NVal) -> + %% N=NVal primaries, each on a unique node length(lists:usort([Node || {{_Idx, Node}, Type} <- PL, - Type == primary])) == 3. + Type == primary])) == NVal. get_coord_client_and_patsy(Clients, PL) -> {CoordNode, _}=CoordClient=kv679_tombstone:coordinating_client(Clients, PL), From 1e7e94aacb4e8c748963f4970b3b3cf82c8a54e3 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Mon, 3 Apr 2017 16:48:46 +0100 Subject: [PATCH 3/3] Add a more sophisticated equality check to verify The partition repair test deletes all the data at a partition, and then repairs it from neighbouring partitions. The subset of repaired data that was originally coordinated by the deleted partition's vnode showed up as `notfound` since the latest kv679 changes here https://github.com/basho/riak_kv/pull/1643/. The reason is that the fix in the KV repo adds a new actor to the repaired key's vclock. Prior to this change `verify` in partition_repair.erl did a simple equality check on the binary encoded riak_object values. This change takes into account that a new actor may be in the vclock at the repaired vnode, and uses a semantic equality check based on riak_object merge and riak object equal. --- tests/partition_repair.erl | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/tests/partition_repair.erl b/tests/partition_repair.erl index b5c54da6..4c984943 100644 --- a/tests/partition_repair.erl +++ b/tests/partition_repair.erl @@ -169,7 +169,7 @@ kill_repair_verify({Partition, Node}, DataSuffix, Service) -> lager:info("Verify ~p on ~p is fully repaired", [Partition, Node]), Data2 = get_data(Service, {Partition, Node}), - {Verified, NotFound} = dict:fold(verify(Service, Data2), {0, []}, Stash), + {Verified, NotFound} = dict:fold(verify(Node, Service, Data2), {0, []}, Stash), case NotFound of [] -> ok; _ -> @@ -188,7 +188,7 @@ kill_repair_verify({Partition, Node}, DataSuffix, Service) -> {ok, [StashB]} = file:consult(StashPathB), ExpectToVerifyB = dict:size(StashB), BeforeData = get_data(Service, B), - {VerifiedB, NotFoundB} = dict:fold(verify(Service, BeforeData), {0, []}, StashB), + {VerifiedB, NotFoundB} = dict:fold(verify(Node, Service, BeforeData), {0, []}, StashB), case NotFoundB of [] -> ok; _ -> @@ -203,7 +203,7 @@ kill_repair_verify({Partition, Node}, DataSuffix, Service) -> {ok, [StashA]} = file:consult(StashPathA), ExpectToVerifyA = dict:size(StashA), AfterData = get_data(Service, A), - {VerifiedA, NotFoundA} = dict:fold(verify(Service, AfterData), {0, []}, StashA), + {VerifiedA, NotFoundA} = dict:fold(verify(Node, Service, AfterData), {0, []}, StashA), case NotFoundA of [] -> ok; _ -> @@ -214,19 +214,23 @@ kill_repair_verify({Partition, Node}, DataSuffix, Service) -> ?assertEqual(ExpectToVerifyA, VerifiedA). -verify(riak_kv, DataAfterRepair) -> +verify(Node, riak_kv, DataAfterRepair) -> fun(BKey, StashedValue, {Verified, NotFound}) -> StashedData={BKey, StashedValue}, case dict:find(BKey, DataAfterRepair) of error -> {Verified, [StashedData|NotFound]}; {ok, Value} -> - if Value == StashedValue -> {Verified+1, NotFound}; - true -> {Verified, [StashedData|NotFound]} + %% NOTE: since kv679 fixes, the binary values may + %% not be equal where a new epoch-actor-entry has + %% been added to the repaired value in the vnode + case gte(Node, Value, StashedValue, BKey) of + true -> {Verified+1, NotFound}; + false -> {Verified, [StashedData|NotFound]} end end end; -verify(riak_search, PostingsAfterRepair) -> +verify(_Node, riak_search, PostingsAfterRepair) -> fun(IFT, StashedPostings, {Verified, NotFound}) -> StashedPosting={IFT, StashedPostings}, case dict:find(IFT, PostingsAfterRepair) of @@ -241,6 +245,22 @@ verify(riak_search, PostingsAfterRepair) -> end end. +%% @private gte checks that `Value' is _at least_ `StashedValue'. With +%% the changes for kv679 when a vnode receives a write of a key that +%% contains the vnode's id as an entry in the version vector, it adds +%% a new actor-epoch-entry to the version vector to guard against data +%% loss from repeated events (remember a VV history is supposed to be +%% unique!) This function then must deserialise the stashed and vnode +%% data and check that they are equal, or if not, the only difference +%% is an extra epoch actor in the vnode value's vclock. +gte(Node, Value, StashedData, {B, K}) -> + VnodeObject = riak_object:from_binary(B, K, Value), + StashedObject = riak_object:from_binary(B, K, StashedData), + %% NOTE: we need a ring and all that jazz for bucket props, needed + %% by merge, so use an RPC to merge on a riak node. + Merged = rpc:call(Node, riak_object, syntactic_merge, [VnodeObject, StashedObject]), + riak_object:equal(VnodeObject, Merged). + is_true(X) -> X == true.