Merge pull request #1068 from basho/bch-merge-develop-2.2-to-ts

Merge develop onto riak_ts-develop
This commit is contained in:
Brett Hazen 2016-05-03 16:26:07 -06:00
commit 47a9236f1c
18 changed files with 1565 additions and 1133 deletions

View File

@ -110,8 +110,9 @@ main(Args) ->
notice
end,
Formatter = {lager_default_formatter, [time," [",severity,"] ", pid, " ", message, "\n"]},
application:set_env(lager, error_logger_hwm, 250), %% helpful for debugging
application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel},
application:set_env(lager, handlers, [{lager_console_backend, [ConsoleLagerLevel, Formatter]},
{lager_file_backend, [{file, "log/test.log"},
{level, ConsoleLagerLevel}]}]),
lager:start(),

View File

@ -146,6 +146,7 @@
upgrade/2,
upgrade/3,
versions/0,
wait_for_any_webmachine_route/2,
wait_for_cluster_service/2,
wait_for_cmd/1,
wait_for_service/2,
@ -672,7 +673,7 @@ wait_until(Fun) when is_function(Fun) ->
%% @doc Convenience wrapper for wait_until for the myriad functions that
%% take a node as single argument.
-spec wait_until([node()], fun((node()) -> boolean())) -> ok.
-spec wait_until(node(), fun(() -> boolean())) -> ok | {fail, Result :: term()}.
wait_until(Node, Fun) when is_atom(Node), is_function(Fun) ->
wait_until(fun() -> Fun(Node) end);
@ -1954,19 +1955,18 @@ wait_for_control(_Vsn, Node) when is_atom(Node) ->
end
end),
lager:info("Waiting for routes to be added to supervisor..."),
%% Wait for routes to be added by supervisor.
wait_for_any_webmachine_route(Node, [admin_gui, riak_control_wm_gui]).
wait_for_any_webmachine_route(Node, Routes) ->
lager:info("Waiting for routes ~p to be added to webmachine.", [Routes]),
rt:wait_until(Node, fun(N) ->
case rpc:call(N,
webmachine_router,
get_routes,
[]) of
case rpc:call(N, webmachine_router, get_routes, []) of
{badrpc, Error} ->
lager:info("Error was ~p.", [Error]),
false;
Routes ->
case is_control_gui_route_loaded(Routes) of
RegisteredRoutes ->
case is_any_route_loaded(Routes, RegisteredRoutes) of
false ->
false;
_ ->
@ -1975,9 +1975,11 @@ wait_for_control(_Vsn, Node) when is_atom(Node) ->
end
end).
%% @doc Is the riak_control GUI route loaded?
is_control_gui_route_loaded(Routes) ->
lists:keymember(admin_gui, 2, Routes) orelse lists:keymember(riak_control_wm_gui, 2, Routes).
is_any_route_loaded(SearchRoutes, RegisteredRoutes) ->
lists:any(fun(Route) -> is_route_loaded(Route, RegisteredRoutes) end, SearchRoutes).
is_route_loaded(Route, Routes) ->
lists:keymember(Route, 2, Routes).
%% @doc Wait for Riak Control to start on a series of nodes.
wait_for_control(VersionedNodes) when is_list(VersionedNodes) ->

View File

@ -24,9 +24,7 @@
-compile(export_all).
-export([confirm/0]).
% node_package 3.x changes this - new first, old second
-define(PING_FAILURE_OUTPUT,
["Node did not respond to ping!", "Node is not running!"]).
-define(PING_FAILURE_OUTPUT, "Node did not respond to ping!").
confirm() ->
@ -122,7 +120,7 @@ ping_down_test(Node) ->
attach_down_test(Node) ->
lager:info("Testing riak attach while down"),
{ok, AttachOut} = rt:riak(Node, ["attach"]),
?assert(rt:str_mult(AttachOut, ?PING_FAILURE_OUTPUT)),
?assert(rt:str(AttachOut, ?PING_FAILURE_OUTPUT)),
ok.
attach_direct_up_test(Node) ->
@ -137,7 +135,7 @@ attach_direct_up_test(Node) ->
attach_direct_down_test(Node) ->
lager:info("Testing riak attach-direct while down"),
{ok, AttachOut} = rt:riak(Node, ["attach-direct"]),
?assert(rt:str_mult(AttachOut, ?PING_FAILURE_OUTPUT)),
?assert(rt:str(AttachOut, ?PING_FAILURE_OUTPUT)),
ok.
status_up_test(Node) ->
@ -155,7 +153,7 @@ status_down_test(Node) ->
lager:info("Test riak-admin status while down"),
{ok, {ExitCode, StatusOut}} = rt:admin(Node, ["status"], [return_exit_code]),
?assertEqual(1, ExitCode),
?assert(rt:str_mult(StatusOut, ?PING_FAILURE_OUTPUT)),
?assert(rt:str(StatusOut, ?PING_FAILURE_OUTPUT)),
ok.
getpid_up_test(Node) ->

View File

@ -105,6 +105,7 @@ confirm() ->
BKV <- [?NORMAL_BKV,
?CONSISTENT_BKV,
?WRITE_ONCE_BKV]],
%% Test cover queries doesn't depend on bucket/keyvalue, just run it once
test_cover_queries_overload(Nodes),
pass.
@ -168,12 +169,15 @@ test_vnode_protection(Nodes, BKV) ->
Pid ! resume,
ok.
%% Don't check on fast path
test_fsm_protection(_, ?WRITE_ONCE_BKV) ->
ok;
%% Or consistent gets, as they don't use the FSM either
%% Don't check consistent gets, as they don't use the FSM
test_fsm_protection(_, ?CONSISTENT_BKV) ->
ok;
%% Don't check on fast path either.
test_fsm_protection(_, ?WRITE_ONCE_BKV) ->
ok;
test_fsm_protection(Nodes, BKV) ->
lager:info("Testing with coordinator protection enabled"),
lager:info("Setting FSM limit to ~b", [?THRESHOLD]),

View File

@ -196,14 +196,17 @@ wait_until_fullsync_stopped(SourceLeader) ->
end).
wait_for_reads(Node, Start, End, Bucket, R) ->
rt:wait_until(Node,
ok = rt:wait_until(Node,
fun(_) ->
Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true),
Reads == []
end),
Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true),
lager:info("Reads: ~p", [Reads]),
length(Reads).
%% rt:systest_read/6 returns a list of errors encountered while performing
%% the requested reads. Since we are asserting this list is empty above,
%% we already know that if we reached here, that the list of reads has
%% no errors. Therefore, we simply return 0 and do not execute another
%% systest_read call.
0.
confirm_missing(Node, Start, End, Bucket, R) ->
Reads = rt:systest_read(Node, Start, End, Bucket, R, <<>>, true),

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,154 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +---+
%% | 1 |
%% +---+
%% ^ ^
%% / \
%% V V
%% +---+ +---+
%% | 6 | | 2 |
%% +---+ +---+
%% ^ ^
%% | |
%% V V
%% +---+ +---+
%% | 5 | | 3 |
%% +---+ +---+
%% ^ ^
%% \ /
%% V
%% +---+
%% | 4 |
%% +---+
%% -------------------------------------------------------------------
-module(rt_cascading_big_circle).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = big_circle_setup(),
_ = big_circle_tests(State),
pass.
big_circle_setup() ->
Conf = lists:map(fun(I) ->
{integer_to_list(I), 1}
end, lists:seq(1, 6)),
NamesAndNodes = rt_cascading:make_clusters(Conf),
Nodes = lists:flatten([ClusterNodes || {_Name, ClusterNodes} <- NamesAndNodes]),
Names = [ClusterName || {ClusterName, _} <- Conf],
[NameHd | NameTail] = Names,
ConnectTo = NameTail ++ [NameHd],
ClustersAndConnectTo = lists:zip(NamesAndNodes, ConnectTo),
ok = lists:foreach(fun({SourceCluster, SinkName}) ->
{_SourceName, [Node]} = SourceCluster,
[SinkNode] = proplists:get_value(SinkName, NamesAndNodes),
Port = rt_cascading:get_cluster_mgr_port(SinkNode),
rt_cascading:connect_rt(Node, Port, SinkName)
end, ClustersAndConnectTo),
Nodes.
big_circle_tests(Nodes) ->
% Initally just 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 1, but then 2 way is
% added later.
Tests = [
{"circle it", fun() ->
[One | _] = Nodes,
C = rt:pbc(One),
Bin = <<"goober">>,
Bucket = <<"objects">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(C, Obj, [{w,1}]),
riakc_pb_socket:stop(C),
[begin
?debugFmt("Checking ~p", [Node]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(Node, Bucket, Bin))
end || Node <- Nodes]
end},
{"2 way repl, and circle it", fun() ->
ConnectTo = ["6", "1", "2", "3", "4", "5"],
Connect = fun({Node, ConnectToName}) ->
Nth = list_to_integer(ConnectToName),
ConnectNode = lists:nth(Nth, Nodes),
Port = rt_cascading:get_cluster_mgr_port(ConnectNode),
rt_cascading:connect_rt(Node, Port, ConnectToName)
end,
lists:map(Connect, lists:zip(Nodes, ConnectTo)),
C = rt:pbc(hd(Nodes)),
Bin = <<"2 way repl">>,
Bucket = <<"objects">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(C, Obj, [{w,1}]),
lists:map(fun(N) ->
?debugFmt("Testing ~p", [N]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(N, Bucket, Bin))
end, Nodes)
% there will be duplicate writes, but due to size of the circle,
% there's not going to be a lot. Also, it's very difficult to
% determine when/where a duplicate may start/occur.
% a full breakdown:
% "1" forwards to "2" and "6", noting its local forwards.
% so we have two flows going. Assuming both sides flow at the same
% rate:
% 1
% / \
% 6 2: 6 has [1, 2, 6]; 2 has [1, 2, 6]
% 5 3: 5 has [1,2,5,6]; 3 has [1,2,3,6]
% 4 4: 4 has [1,2,4,5,6]; 4 has [1,2,3,4,6] ! double write
% 3 5: 3 has [1,2,3,4,5,6]; 5 has [1,2,3,4,5,6] ! double write
%
% let's explore the flow with 10 clusters:
% 1
% / \
% 10 2 10: [1,2,10]; 2: [1,2,10]
% 9 3 9: [1,2,9,10]; 3: [1,2,3,10]
% 8 4 8: [1,2,8,9,10]; 4: [1,2,3,4,10]
% 7 5 7: [1,2,7,8,9,10]; 5: [1,2,3,4,5,10]
% 6 6 6: [1,2,6,7,8,9,10]; 6: [1,2,3,4,5,6,10] !!
% 5 7 5: [1,2,5..10]; 7: [1..7,10] !!
% 4 8 4: [1,2,4..10]; 8: [1..8,10] !!
% 3 9 3: [1..10]; 9: [1..10] !!
% so, by adding 4 clusters, we've added 2 overlaps.
% best guess based on what's above is:
% NumDuplicateWrites = ceil(NumClusters/2 - 1.5)
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== big circle: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,107 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +-----+
%% | one |
%% +-----+
%% ^ \
%% / V
%% +-------+ +-----+
%% | three | <- | two |
%% +-------+ +-----+
%% -------------------------------------------------------------------
-module(rt_cascading_circle).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = circle_setup(),
_ = circle_tests(State),
pass.
circle_setup() ->
Conf = [{"one", 1}, {"two", 1}, {"three", 1}],
Clusters = rt_cascading:make_clusters(Conf),
[[One], [Two], [Three]] = Unflattened = [ClusterNodes || {_Name, ClusterNodes} <- Clusters],
Connections = [
{One, Two, "two"},
{Two, Three, "three"},
{Three, One, "one"}
],
ok = lists:foreach(fun({Node, ConnectNode, Name}) ->
Port = rt_cascading:get_cluster_mgr_port(ConnectNode),
rt_cascading:connect_rt(Node, Port, Name)
end, Connections),
lists:flatten(Unflattened).
circle_tests(Nodes) ->
% +-----+
% | one |
% +-----+
% ^ \
% / V
% +-------+ +-----+
% | three | <- | two |
% +-------+ +-----+
Tests = [
{"cascade all the way to the other end, but no further", fun() ->
Client = rt:pbc(hd(Nodes)),
Bin = <<"cascading">>,
Obj = riakc_obj:new(<<"objects">>, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(lists:last(Nodes), <<"objects">>, Bin)),
% we want to ensure there's not a cascade back to the beginning, so
% there's no event we can properly wait for. All we can do is wait
% and make sure we didn't update/write the object.
timer:sleep(1000),
Status = rpc:call(hd(Nodes), riak_repl2_rt, status, []),
[SinkData] = proplists:get_value(sinks, Status, [[]]),
?assertEqual(undefined, proplists:get_value(expect_seq, SinkData))
end},
{"cascade starting at a different point", fun() ->
[One, Two | _] = Nodes,
Client = rt:pbc(Two),
Bin = <<"start_at_two">>,
Obj = riakc_obj:new(<<"objects">>, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(One, <<"objects">>, Bin)),
timer:sleep(1000),
Status = rpc:call(Two, riak_repl2_rt, status, []),
[SinkData] = proplists:get_value(sinks, Status, [[]]),
?assertEqual(2, proplists:get_value(expect_seq, SinkData))
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== circle: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,113 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +------------+
%% | north_spur |
%% +------------+
%% ^
%% |
%% +-------+
%% +---> | north | ---+
%% | +-------+ |
%% | V
%% +-----------+ +------+ +------+ +-----------+
%% | west_spur | <- | west | <-------- | east | -> | east_spur |
%% +-----------+ +------+ +------+ +-----------+
%% -------------------------------------------------------------------
-module(rt_cascading_circle_and_spurs).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = circle_and_spurs_setup(),
_ = circle_and_spurs_tests(State),
pass.
circle_and_spurs_setup() ->
Config = [
{"north", 1, ["east", "north_spur"]},
{"east", 1, ["west", "east_spur"]},
{"west", 1, ["north", "west_spur"]},
{"north_spur", 1},
{"east_spur", 1},
{"west_spur", 1}
],
Clusters = rt_cascading:make_clusters(Config),
lists:flatten([Nodes || {_, Nodes} <- Clusters]).
circle_and_spurs_tests(Nodes) ->
Tests = [
{"start at north", fun() ->
[North | _Rest] = Nodes,
Client = rt:pbc(North),
Bin = <<"start at north">>,
Bucket = <<"objects">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
[begin
?debugFmt("Checking ~p", [N]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(N, Bucket, Bin))
end || N <- Nodes, N =/= North]
end},
{"Start at west", fun() ->
[_North, _East, West | _Rest] = Nodes,
Client = rt:pbc(West),
Bin = <<"start at west">>,
Bucket = <<"objects">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
[begin
?debugFmt("Checking ~p", [N]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(N, Bucket, Bin))
end || N <- Nodes, N =/= West]
end},
{"spurs don't replicate back", fun() ->
[_North, _East, _West, NorthSpur | _Rest] = Nodes,
Client = rt:pbc(NorthSpur),
Bin = <<"start at north_spur">>,
Bucket = <<"objects">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
[begin
?debugFmt("Checking ~p", [N]),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(N, Bucket, Bin))
end || N <- Nodes, N =/= NorthSpur]
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== circle_and_spurs: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,129 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +-----+
%% +--------------->| top |
%% | loop added +-----+
%% | / \
%% | V V
%% | +---------+ +----------+
%% ^ | midleft | | midright |
%% | +---------+ +----------+
%% | \ /
%% | V V
%% | +--------+
%% +-------<-------| bottom |
%% +--------+
%% -------------------------------------------------------------------
-module(rt_cascading_diamond).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = diamond_setup(),
_ = diamond_tests(State),
pass.
diamond_setup() ->
Clusters = rt_cascading:make_clusters([{"top", 1}, {"midleft", 1}, {"midright", 1}, {"bottom", 1}]),
GetNode = fun(Name) ->
[N] = proplists:get_value(Name, Clusters),
N
end,
GetPort = fun(Name) ->
rt_cascading:get_cluster_mgr_port(GetNode(Name))
end,
rt_cascading:connect_rt(GetNode("top"), GetPort("midleft"), "midleft"),
rt_cascading:connect_rt(GetNode("midleft"), GetPort("bottom"), "bottom"),
rt_cascading:connect_rt(GetNode("midright"), GetPort("bottom"), "bottom"),
rt_cascading:connect_rt(GetNode("top"), GetPort("midright"), "midright"),
lists:flatten([Nodes || {_, Nodes} <- Clusters]).
diamond_tests(Nodes) ->
Tests = [
{"unfortunate double write", fun() ->
[Top, MidLeft, MidRight, Bottom] = Nodes,
Client = rt:pbc(Top),
Bin = <<"start_at_top">>,
Obj = riakc_obj:new(<<"objects">>, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
timer:sleep(100000),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(MidLeft, <<"objects">>, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(MidRight, <<"objects">>, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(Bottom, <<"objects">>, Bin)),
%timer:sleep(1000),
Status = rpc:call(Bottom, riak_repl2_rt, status, []),
[SinkOne, SinkTwo] = proplists:get_value(sinks, Status, [[], []]),
?assertEqual(proplists:get_value(expect_seq, SinkOne), proplists:get_value(expect_seq, SinkTwo))
end},
{"connect bottom to top", fun() ->
[Top, _MidLeft, _MidRight, Bottom] = Nodes,
Port = rt_cascading:get_cluster_mgr_port(Top),
rt_cascading:connect_rt(Bottom, Port, "top"),
WaitFun = fun(N) ->
Status = rpc:call(N, riak_repl2_rt, status, []),
Sinks = proplists:get_value(sinks, Status, []),
length(Sinks) == 1
end,
?assertEqual(ok, rt:wait_until(Top, WaitFun))
end},
{"start at midright", fun() ->
[Top, MidLeft, MidRight, Bottom] = Nodes,
% To ensure a write doesn't happen to MidRight when it originated
% on midright, we're going to compare the expect_seq before and
% after.
Status = rpc:call(MidRight, riak_repl2_rt, status, []),
[Sink] = proplists:get_value(sinks, Status, [[]]),
ExpectSeq = proplists:get_value(expect_seq, Sink),
Client = rt:pbc(MidRight),
Bin = <<"start at midright">>,
Bucket = <<"objects">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
[begin
?debugFmt("Checking ~p", [N]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(N, Bucket, Bin))
end || N <- [Bottom, Top, MidLeft]],
Status2 = rpc:call(MidRight, riak_repl2_rt, status, []),
[Sink2] = proplists:get_value(sinks, Status2, [[]]),
GotSeq = proplists:get_value(expect_seq, Sink2),
?assertEqual(ExpectSeq, GotSeq)
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== diamond: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,66 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%% -------------------------------------------------------------------
-module(rt_cascading_ensure_ack).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = ensure_ack_setup(),
_ = ensure_ack_tests(State),
pass.
ensure_ack_setup() ->
Clusters = rt_cascading:make_clusters([{"A", 1, ["B"]}, {"B", 1}]),
lists:flatten([Nodes || {_, Nodes} <- Clusters]).
ensure_ack_tests(Nodes) ->
[LeaderA, LeaderB] = Nodes,
lager:info("Nodes:~p, ~p", [LeaderA, LeaderB]),
TestHash = list_to_binary([io_lib:format("~2.16.0b", [X]) ||
<<X>> <= erlang:md5(term_to_binary(os:timestamp()))]),
TestBucket = <<TestHash/binary, "-rt_test_a">>,
%% Write some objects to the source cluster (A),
lager:info("Writing 1 key to ~p, which should RT repl to ~p",
[LeaderA, LeaderB]),
?assertEqual([], repl_util:do_write(LeaderA, 1, 1, TestBucket, 2)),
%% verify data is replicated to B
lager:info("Reading 1 key written from ~p", [LeaderB]),
?assertEqual(0, repl_util:wait_for_reads(LeaderB, 1, 1, TestBucket, 2)),
RTQStatus = rpc:call(LeaderA, riak_repl2_rtq, status, []),
Consumers = proplists:get_value(consumers, RTQStatus),
case proplists:get_value("B", Consumers) of
undefined ->
[];
Consumer ->
Unacked = proplists:get_value(unacked, Consumer, 0),
lager:info("unacked: ~p", [Unacked]),
?assertEqual(0, Unacked)
end.

View File

@ -0,0 +1,228 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +-----+
%% | n12 |
%% +-----+
%% ^ \
%% / V
%% +-----+ +-----+
%% | n56 | <- | n34 |
%% +-----+ +-----+
%%
%% This test is configurable for 1.3 versions of Riak, but off by default.
%% place the following config in ~/.riak_test_config to run:
%%
%% {run_rt_cascading_1_3_tests, true}
%% -------------------------------------------------------------------
-module(rt_cascading_mixed_clusters).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(bucket, <<"objects">>).
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
case rt_config:config_or_os_env(run_rt_cascading_1_3_tests, false) of
false ->
lager:info("mixed_version_clusters_test_ not configured to run!");
_ ->
State = mixed_version_clusters_setup(),
_ = mixed_version_clusters_tests(State)
end,
pass.
mixed_version_clusters_setup() ->
Conf = rt_cascading:conf(),
DeployConfs = [{previous, Conf} || _ <- lists:seq(1,6)],
Nodes = rt:deploy_nodes(DeployConfs),
[N1, N2, N3, N4, N5, N6] = Nodes,
case rpc:call(N1, application, get_key, [riak_core, vsn]) of
% this is meant to test upgrading from early BNW aka
% Brave New World aka Advanced Repl aka version 3 repl to
% a cascading realtime repl. Other tests handle going from pre
% repl 3 to repl 3.
{ok, Vsn} when Vsn < "1.3.0" ->
{too_old, Nodes};
_ ->
N12 = [N1, N2],
N34 = [N3, N4],
N56 = [N5, N6],
repl_util:make_cluster(N12),
repl_util:make_cluster(N34),
repl_util:make_cluster(N56),
repl_util:name_cluster(N1, "n12"),
repl_util:name_cluster(N3, "n34"),
repl_util:name_cluster(N5, "n56"),
[repl_util:wait_until_leader_converge(Cluster) || Cluster <- [N12, N34, N56]],
rt_cascading:connect_rt(N1, rt_cascading:get_cluster_mgr_port(N3), "n34"),
rt_cascading:connect_rt(N3, rt_cascading:get_cluster_mgr_port(N5), "n56"),
rt_cascading:connect_rt(N5, rt_cascading:get_cluster_mgr_port(N1), "n12"),
Nodes
end.
mixed_version_clusters_tests({too_old, _Nodes}) ->
ok;
mixed_version_clusters_tests(Nodes) ->
[N1, N2, N3, N4, N5, N6] = Nodes,
Tests = [
{"no cascading at first 1", fun() ->
Client = rt:pbc(N1),
Bin = <<"no cascade yet">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 2}]),
riakc_pb_socket:stop(Client),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists([N5, N6], ?bucket, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists([N3, N4], ?bucket, Bin))
end},
{"no cascading at first 2", fun() ->
Client = rt:pbc(N2),
Bin = <<"no cascade yet 2">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 2}]),
riakc_pb_socket:stop(Client),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists([N5, N6], ?bucket, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists([N3, N4], ?bucket, Bin))
end},
{"mixed source can send (setup)", fun() ->
rt:upgrade(N1, current),
repl_util:wait_until_leader_converge([N1, N2]),
Running = fun(Node) ->
RTStatus = rpc:call(Node, riak_repl2_rt, status, []),
if
is_list(RTStatus) ->
SourcesList = proplists:get_value(sources, RTStatus, []),
Sources = [S || S <- SourcesList,
is_list(S),
proplists:get_value(connected, S, false),
proplists:get_value(source, S) =:= "n34"
],
length(Sources) >= 1;
true ->
false
end
end,
?assertEqual(ok, rt:wait_until(N1, Running)),
% give the node further time to settle
StatsNotEmpty = fun(Node) ->
case rpc:call(Node, riak_repl_stats, get_stats, []) of
[] ->
false;
Stats ->
is_list(Stats)
end
end,
?assertEqual(ok, rt:wait_until(N1, StatsNotEmpty))
end},
{"node1 put", fun() ->
Client = rt:pbc(N1),
Bin = <<"rt after upgrade">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 2}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(N3, ?bucket, Bin, rt_cascading:timeout(100))),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(N5, ?bucket, Bin, 100000))
end},
{"node2 put", fun() ->
Client = rt:pbc(N2),
Bin = <<"rt after upgrade 2">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 2}]),
riakc_pb_socket:stop(Client),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(N5, ?bucket, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists([N3,N4], ?bucket, Bin))
end},
{"upgrade the world, cascade starts working", fun() ->
[N1 | NotUpgraded] = Nodes,
[rt:upgrade(Node, current) || Node <- NotUpgraded],
repl_util:wait_until_leader_converge([N1, N2]),
repl_util:wait_until_leader_converge([N3, N4]),
repl_util:wait_until_leader_converge([N5, N6]),
ClusterMgrUp = fun(Node) ->
case rpc:call(Node, erlang, whereis, [riak_core_cluster_manager]) of
P when is_pid(P) ->
true;
_ ->
fail
end
end,
[rt:wait_until(N, ClusterMgrUp) || N <- Nodes],
rt_cascading:maybe_reconnect_rt(N1, rt_cascading:get_cluster_mgr_port(N3), "n34"),
rt_cascading:maybe_reconnect_rt(N3, rt_cascading:get_cluster_mgr_port(N5), "n56"),
rt_cascading:maybe_reconnect_rt(N5, rt_cascading:get_cluster_mgr_port(N1), "n12"),
ToB = fun
(Atom) when is_atom(Atom) ->
list_to_binary(atom_to_list(Atom));
(N) when is_integer(N) ->
list_to_binary(integer_to_list(N))
end,
ExistsEverywhere = fun(Key, LookupOrder) ->
Reses = [rt_cascading:maybe_eventually_exists(Node, ?bucket, Key) || Node <- LookupOrder],
?debugFmt("Node and it's res:~n~p", [lists:zip(LookupOrder,
Reses)]),
lists:all(fun(E) -> E =:= Key end, Reses)
end,
MakeTest = fun(Node, N) ->
Name = "writing " ++ atom_to_list(Node) ++ "-write-" ++ integer_to_list(N),
{NewTail, NewHead} = lists:splitwith(fun(E) ->
E =/= Node
end, Nodes),
ExistsLookup = NewHead ++ NewTail,
Test = fun() ->
?debugFmt("Running test ~p", [Name]),
Client = rt:pbc(Node),
Key = <<(ToB(Node))/binary, "-write-", (ToB(N))/binary>>,
Obj = riakc_obj:new(?bucket, Key, Key),
riakc_pb_socket:put(Client, Obj, [{w, 2}]),
riakc_pb_socket:stop(Client),
?assert(ExistsEverywhere(Key, ExistsLookup))
end,
{Name, Test}
end,
NodeTests = [MakeTest(Node, N) || Node <- Nodes, N <- lists:seq(1, 3)],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== mixed version cluster: upgrade world: ~s =====", [Name]),
Eval()
end, NodeTests)
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== mixed version cluster: ~p =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,150 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +------+
%% | New1 |
%% +------+
%% ^ \
%% / V
%% +------+ +------+
%% | New3 | <- | Old2 |
%% +------+ +------+
%%
%% This test is configurable for 1.3 versions of Riak, but off by default.
%% place the following config in ~/.riak_test_config to run:
%%
%% {run_rt_cascading_1_3_tests, true}
%% -------------------------------------------------------------------
-module(rt_cascading_new_to_old).
-behavior(riak_test).
%% API
-export([confirm/0]).
-define(bucket, <<"objects">>).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
case rt_config:config_or_os_env(run_rt_cascading_1_3_tests, false) of
false ->
lager:info("new_to_old_test_ not configured to run.");
_ ->
lager:info("new_to_old_test_ configured to run for 1.3"),
State = new_to_old_setup(),
_ = new_to_old_tests(State)
end,
pass.
new_to_old_setup() ->
Conf = rt_cascading:conf(),
DeployConfs = [{current, Conf}, {previous, Conf}, {current, Conf}],
[New1, Old2, New3] = Nodes = rt:deploy_nodes(DeployConfs),
case rpc:call(Old2, application, get_key, [riak_core, vsn]) of
% this is meant to test upgrading from early BNW aka
% Brave New World aka Advanced Repl aka version 3 repl to
% a cascading realtime repl. Other tests handle going from pre
% repl 3 to repl 3.
{ok, Vsn} when Vsn < "1.3.0" ->
{too_old, Nodes};
_ ->
[repl_util:make_cluster([N]) || N <- Nodes],
Names = ["new1", "old2", "new3"],
[repl_util:name_cluster(Node, Name) || {Node, Name} <- lists:zip(Nodes, Names)],
[repl_util:wait_until_is_leader(N) || N <- Nodes],
rt_cascading:connect_rt(New1, 10026, "old2"),
rt_cascading:connect_rt(Old2, 10036, "new3"),
rt_cascading:connect_rt(New3, 10016, "new1"),
Nodes
end.
new_to_old_tests(Nodes) ->
% +------+
% | New1 |
% +------+
% ^ \
% / V
% +------+ +------+
% | New3 | <- | Old2 |
% +------+ +------+
%
% This test is configurable for 1.3 versions of Riak, but off by default.
% place the following config in ~/.riak_test_config to run:
%
% {run_rt_cascading_1_3_tests, true}
[New1, Old2, New3] = Nodes,
Tests = [
{"From new1 to old2", fun() ->
Client = rt:pbc(New1),
Bin = <<"new1 to old2">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 1}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(Old2, ?bucket, Bin)),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(New3, ?bucket, Bin))
end},
{"old2 does not cascade at all", fun() ->
Client = rt:pbc(New1),
Bin = <<"old2 no cascade">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 1}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(Old2, ?bucket, Bin)),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(New3, ?bucket, Bin))
end},
{"from new3 to old2", fun() ->
Client = rt:pbc(New3),
Bin = <<"new3 to old2">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w, 1}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(New1, ?bucket, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(Old2, ?bucket, Bin))
end},
{"from old2 to new3 no cascade", fun() ->
% in the future, cascading may be able to occur even if it starts
% from an older source cluster/node. It is prevented for now by
% having no easy/good way to get the name of the source cluster,
% thus preventing complete information on the routed clusters.
Client = rt:pbc(Old2),
Bin = <<"old2 to new3">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(New3, ?bucket, Bin)),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(New1, ?bucket, Bin))
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(["new1", "old2", "new3"])
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== new to old: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,89 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +-----+
%% | top |
%% +-----+
%% / \
%% V V
%% +------+ +-------+
%% | left | | right |
%% +------+ +-------+
%% | |
%% V V
%% +-------+ +--------+
%% | left2 | | right2 |
%% +-------+ +--------+
%% -------------------------------------------------------------------
-module(rt_cascading_pyramid).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = pyramid_setup(),
_ = pyramid_tests(State),
pass.
pyramid_setup() ->
Conf = [{"top", 1}, {"left", 1}, {"left2", 1}, {"right", 1}, {"right2", 1}],
Clusters = rt_cascading:make_clusters(Conf),
GetPort = fun(Name) ->
[Node] = proplists:get_value(Name, Clusters),
rt_cascading:get_cluster_mgr_port(Node)
end,
[Top] = proplists:get_value("top", Clusters),
[Left] = proplists:get_value("left", Clusters),
[Right] = proplists:get_value("right", Clusters),
rt_cascading:connect_rt(Top, GetPort("left"), "left"),
rt_cascading:connect_rt(Left, GetPort("left2"), "left2"),
rt_cascading:connect_rt(Top, GetPort("right"), "right"),
rt_cascading:connect_rt(Right, GetPort("right2"), "right2"),
lists:flatten([Nodes || {_, Nodes} <- Clusters]).
pyramid_tests(Nodes) ->
Tests = [
{"Cascade to both kids", fun() ->
[Top | _] = Nodes,
Client = rt:pbc(Top),
Bucket = <<"objects">>,
Bin = <<"pyramid_top">>,
Obj = riakc_obj:new(Bucket, Bin, Bin),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
lists:map(fun(N) ->
?debugFmt("Checking ~p", [N]),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(N, Bucket, Bin))
end, Nodes)
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero(Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== pyramid: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,118 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% Topology for this cascading replication test:
%% +-----------+ +--------+ +-----+
%% | beginning | -> | middle | -> | end |
%% +-----------+ +--------+ +-----+
%% -------------------------------------------------------------------
-module(rt_cascading_simple).
-behavior(riak_test).
-include_lib("eunit/include/eunit.hrl").
-export([confirm/0]).
-record(simple_state, {
beginning :: node(),
middle :: node(),
ending :: node()
}).
-define(bucket, <<"objects">>).
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
State = simple_setup(),
simple_tests(State),
pass.
simple_setup() ->
[BeginCluster, MidCluster, EndCluster] = rt_cascading:make_clusters([
{"beginning", 1},
{"middle", 1},
{"end", 1}
]),
{_, [BeginNode]} = BeginCluster,
{_, [MidNode]} = MidCluster,
{_, [EndNode]} = EndCluster,
#simple_state{beginning = BeginNode, middle = MidNode, ending = EndNode}.
simple_tests(State) ->
Tests = [
{"connecting Beginning to Middle", fun() ->
Port = rt_cascading:get_cluster_mgr_port(State#simple_state.middle),
repl_util:connect_cluster(State#simple_state.beginning, "127.0.0.1", Port),
repl_util:enable_realtime(State#simple_state.beginning, "middle"),
repl_util:start_realtime(State#simple_state.beginning, "middle")
end},
{"connection Middle to End", fun() ->
Port = rt_cascading:get_cluster_mgr_port(State#simple_state.ending),
repl_util:connect_cluster(State#simple_state.middle, "127.0.0.1", Port),
repl_util:enable_realtime(State#simple_state.middle, "end"),
repl_util:start_realtime(State#simple_state.middle, "end")
end},
{"cascade a put from beginning down to ending", fun() ->
BeginningClient = rt:pbc(State#simple_state.beginning),
Bin = <<"cascading realtime">>,
Obj = riakc_obj:new(<<"objects">>, Bin, Bin),
riakc_pb_socket:put(BeginningClient, Obj, [{w,1}]),
riakc_pb_socket:stop(BeginningClient),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(State#simple_state.middle, <<"objects">>, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(State#simple_state.ending, <<"objects">>, Bin))
end},
{"disable cascading on middle", fun() ->
rpc:call(State#simple_state.middle, riak_repl_console, realtime_cascades, [["never"]]),
Bin = <<"disabled cascading">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
Client = rt:pbc(State#simple_state.beginning),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(State#simple_state.middle, ?bucket, Bin)),
?assertEqual({error, notfound}, rt_cascading:maybe_eventually_exists(State#simple_state.ending, ?bucket, Bin))
end},
{"re-enable cascading", fun() ->
rpc:call(State#simple_state.middle, riak_repl_console, realtime_cascades, [["always"]]),
Bin = <<"cascading re-enabled">>,
Obj = riakc_obj:new(?bucket, Bin, Bin),
Client = rt:pbc(State#simple_state.beginning),
riakc_pb_socket:put(Client, Obj, [{w,1}]),
riakc_pb_socket:stop(Client),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(State#simple_state.middle, ?bucket, Bin)),
?assertEqual(Bin, rt_cascading:maybe_eventually_exists(State#simple_state.ending, ?bucket, Bin))
end},
{"check pendings", fun() ->
rt_cascading:wait_until_pending_count_zero([State#simple_state.middle,
State#simple_state.beginning,
State#simple_state.ending])
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== simple: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -0,0 +1,173 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013-2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%% -------------------------------------------------------------------
-module(rt_cascading_unacked_and_queue).
-behavior(riak_test).
%% API
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(POSTCOMMIT_HOOKS, [{postcommit, [{struct, [{<<"mod">>, <<"riak_repl_leader">>},
{<<"fun">>, <<"postcommit">>}]},
{struct, [{<<"mod">>, <<"riak_repl2_rt">>},
{<<"fun">>, <<"postcommit">>}]}]}]).
confirm() ->
%% test requires allow_mult=false b/c of rt:systest_read
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
LeadersAndClusters = ensure_unacked_and_queue_setup(),
TestBucket = rt_cascading:generate_test_bucket(),
[rt:wait_until_bucket_props(Cluster, TestBucket, ?POSTCOMMIT_HOOKS) || {_, Cluster}
<- LeadersAndClusters],
_ = ensure_unacked_and_queue_tests(LeadersAndClusters, TestBucket),
pass.
ensure_unacked_and_queue_setup() ->
Clusters = rt_cascading:make_clusters([{"n123", 3, ["n456"]}, {"n456", 3, ["n123"]}]),
%% Determine the actual leader for each cluster
LeadersAndClusters = lists:map(fun({_Name, Cluster}) ->
{repl_util:get_leader(hd(Cluster)), Cluster}
end, Clusters),
LeadersAndClusters.
ensure_unacked_and_queue_tests([{N123Leader, N123}, {N456Leader, N456}], TestBucket) ->
Tests = [
{"unacked does not increase when there are skips", fun() ->
rt_cascading:write_n_keys(N123Leader, N456Leader, TestBucket, 1, 10000),
rt_cascading:write_n_keys(N456Leader, N123Leader, TestBucket, 10001, 20000),
Res = rt:wait_until(fun() ->
RTQStatus = rpc:call(N123Leader, riak_repl2_rtq, status, []),
Consumers = proplists:get_value(consumers, RTQStatus),
Data = proplists:get_value("n456", Consumers),
Unacked = proplists:get_value(unacked, Data),
?debugFmt("unacked: ~p", [Unacked]),
0 == Unacked
end),
?assertEqual(ok, Res)
end},
{"after acks, queues are empty", fun() ->
Nodes = N123 ++ N456,
Got = lists:map(fun(Node) ->
rpc:call(Node, riak_repl2_rtq, all_queues_empty, [])
end, Nodes),
Expected = [true || _ <- lists:seq(1, length(Nodes))],
?assertEqual(Expected, Got)
end},
{"after acks, queues truly are empty. Truly", fun() ->
Nodes = N123 ++ N456,
Gots = lists:map(fun(Node) ->
{Node, rpc:call(Node, riak_repl2_rtq, dumpq, [])}
end, Nodes),
lists:map(fun({Node, Got}) ->
?debugFmt("Checking data from ~p", [Node]),
?assertEqual([], Got)
end, Gots)
end},
{"dual loads keeps unacked satisfied", fun() ->
LoadN123Pid = spawn(fun() ->
{Time, Val} = timer:tc(fun rt_cascading:write_n_keys/5, [N123Leader, N456Leader, TestBucket, 20001, 30000]),
?debugFmt("loading 123 to 456 took ~p to get ~p", [Time, Val]),
Val
end),
LoadN456Pid = spawn(fun() ->
{Time, Val} = timer:tc(fun rt_cascading:write_n_keys/5, [N456Leader, N123Leader, TestBucket, 30001, 40000]),
?debugFmt("loading 456 to 123 took ~p to get ~p", [Time, Val]),
Val
end),
Exits = rt_cascading:wait_exit([LoadN123Pid, LoadN456Pid], infinity),
?assert(lists:all(fun(E) -> E == normal end, Exits)),
StatusDig = fun(SinkName, Node) ->
Status = rpc:call(Node, riak_repl2_rtq, status, []),
Consumers = proplists:get_value(consumers, Status, []),
ConsumerStats = proplists:get_value(SinkName, Consumers, []),
proplists:get_value(unacked, ConsumerStats)
end,
N123UnackedRes = rt:wait_until(fun() ->
Unacked = StatusDig("n456", N123Leader),
?debugFmt("Unacked: ~p", [Unacked]),
0 == Unacked
end),
?assertEqual(ok, N123UnackedRes),
N456Unacked = StatusDig("n123", N456Leader),
case N456Unacked of
0 ->
?assert(true);
_ ->
N456Unacked2 = StatusDig("n123", N456Leader),
?debugFmt("Not 0, are they at least decreasing?~n"
" ~p, ~p", [N456Unacked2, N456Unacked]),
?assert(N456Unacked2 < N456Unacked)
end
end},
{"after dual load acks, queues are empty", fun() ->
Nodes = N123 ++ N456,
Got = lists:map(fun(Node) ->
rpc:call(Node, riak_repl2_rtq, all_queues_empty, [])
end, Nodes),
Expected = [true || _ <- lists:seq(1, length(Nodes))],
?assertEqual(Expected, Got)
end},
{"after dual load acks, queues truly are empty. Truly", fun() ->
Nodes = N123 ++ N456,
Gots = lists:map(fun(Node) ->
{Node, rpc:call(Node, riak_repl2_rtq, dumpq, [])}
end, Nodes),
lists:map(fun({Node, Got}) ->
?debugFmt("Checking data from ~p", [Node]),
?assertEqual([], Got)
end, Gots)
end},
{"no negative pendings", fun() ->
Nodes = N123 ++ N456,
GetPending = fun({sink_stats, SinkStats}) ->
ConnTo = proplists:get_value(rt_sink_connected_to, SinkStats),
proplists:get_value(pending, ConnTo)
end,
lists:map(fun(Node) ->
?debugFmt("Checking node ~p", [Node]),
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
Sinks = proplists:get_value(sinks, Status),
lists:map(fun(SStats) ->
Pending = GetPending(SStats),
?assertEqual(0, Pending)
end, Sinks)
end, Nodes)
end}
],
lists:foreach(fun({Name, Eval}) ->
lager:info("===== ensure_unacked_and_queue: ~s =====", [Name]),
Eval()
end, Tests).

View File

@ -74,6 +74,7 @@
-export([confirm/0]).
-define(NEG_LIMIT, -1000000). %% Due to sext encoding bug, do not permit negative numbers less than this to be generated.
-define(MAX_CLUSTER_SIZE, 1).
-define(MAX_FIELDS, 1).
-define(FIELDS, ["i" ++ integer_to_list(N) || N <- lists:seq(1, ?MAX_FIELDS)]).

View File

@ -0,0 +1,75 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc
%% Testing a fix for an issue where, once vnode-level stats were created, they were not removed
%% even when the vnode was handed off. Not a big deal, unless you have 1024 vnodes, in which case
%% the first node in a rolling restart of the cluster will consume significantly more memory
%% than the others, as at some point it will have been a fallback for many of the vnodes.
%% Tests https://github.com/basho/riak_kv/pull/1282
%% TODO: Need to also test for vnode crashing, not just clean shutdown.
%%
-module(verify_stats_removal).
-behavior(riak_test).
-export([confirm/0, get_stats_remote/1]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
Nodes = rt:deploy_nodes(2),
[Node1, Node2] = Nodes,
%% Need to write some data to pump the stats system
C = rt:httpc(Node1),
rt:httpc_write(C, <<"systest">>, <<1>>, <<"12345">>),
rt:httpc_read(C, <<"systest">>, <<1>>),
%% Now, join the nodes into a cluster.
rt:join(Node2, Node1),
rt:wait_until_all_members(Nodes),
rt:wait_until_no_pending_changes(Nodes),
rt:wait_until_transfers_complete(Nodes),
rt:load_modules_on_nodes([?MODULE], Nodes),
NonRunningStatsCount1 = get_stats_count_for_non_running_vnodes(Node1),
?assertEqual(0, NonRunningStatsCount1),
NonRunningStatsCount2 = get_stats_count_for_non_running_vnodes(Node2),
?assertEqual(0, NonRunningStatsCount2),
pass.
get_stats_count_for_non_running_vnodes(Node1) ->
spawn_link(Node1, verify_stats_removal, get_stats_remote, [self()]),
receive
{stats, NumStats} ->
NumStats;
_Other ->
throw("Failed to retrieve count of stats from vnode.")
after 60000 ->
throw("Timed out waiting to retrieve count of stats from vnode.")
end.
get_stats_remote(Sender) ->
Running = [list_to_atom(integer_to_list(Idx)) || {_, Idx, _} <- riak_core_vnode_manager:all_vnodes(
riak_kv_vnode)],
Counters = [N || {[riak, riak_kv, vnode, Op, Idx] = N, _, _} <- exometer:find_entries(['_']), not lists:member(
Idx, Running), Op == gets orelse Op == puts, Idx /= time],
Spirals = [N || {[riak, riak_kv, vnode, Op, time, Idx] = N, _, _} <- exometer:find_entries(
['_']), not lists:member(Idx, Running), Op == gets orelse Op == puts],
Stats = Counters ++ Spirals,
Sender ! {stats, length(Stats)}.