mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 00:25:22 +00:00
Merge branch 'master' into perf-harness-merge
one more time
This commit is contained in:
commit
183fa49954
4
Makefile
4
Makefile
@ -27,6 +27,10 @@ clean:
|
||||
distclean: clean
|
||||
@rm -rf riak_test deps
|
||||
|
||||
quickbuild:
|
||||
./rebar skip_deps=true compile
|
||||
./rebar escriptize
|
||||
|
||||
##################
|
||||
# Dialyzer targets
|
||||
##################
|
||||
|
@ -111,7 +111,7 @@ build()
|
||||
echo " - $SRCDIR built."
|
||||
}
|
||||
|
||||
build "riak-1.4.2" $R15B01 http://s3.amazonaws.com/downloads.basho.com/riak/1.4/1.4.2/riak-1.4.2.tar.gz
|
||||
build "riak-1.4.8" $R15B01 http://s3.amazonaws.com/downloads.basho.com/riak/1.4/1.4.8/riak-1.4.8.tar.gz
|
||||
echo
|
||||
build "riak-1.3.2" $R15B01 http://s3.amazonaws.com/downloads.basho.com/riak/1.3/1.3.2/riak-1.3.2.tar.gz
|
||||
echo
|
||||
|
@ -80,7 +80,6 @@ main(Args) ->
|
||||
application:start(ibrowse),
|
||||
%% Start Lager
|
||||
application:load(lager),
|
||||
lager:start(),
|
||||
|
||||
Config = proplists:get_value(config, ParsedArgs),
|
||||
ConfigFile = proplists:get_value(file, ParsedArgs),
|
||||
@ -111,7 +110,10 @@ main(Args) ->
|
||||
notice
|
||||
end,
|
||||
|
||||
application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel}]),
|
||||
application:set_env(lager, handlers, [{lager_console_backend, ConsoleLagerLevel},
|
||||
{lager_file_backend, [{file, "log/test.log"},
|
||||
{level, ConsoleLagerLevel}]}]),
|
||||
lager:start(),
|
||||
|
||||
%% Report
|
||||
Report = case proplists:get_value(report, ParsedArgs, undefined) of
|
||||
|
84
src/rt.erl
84
src/rt.erl
@ -1,6 +1,6 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013 Basho Technologies, Inc.
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
@ -27,6 +27,7 @@
|
||||
-include("rt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-compile(export_all).
|
||||
-export([
|
||||
admin/2,
|
||||
assert_nodes_agree_about_ownership/1,
|
||||
@ -38,6 +39,7 @@
|
||||
build_cluster/2,
|
||||
build_cluster/3,
|
||||
build_clusters/1,
|
||||
join_cluster/1,
|
||||
capability/2,
|
||||
capability/3,
|
||||
check_singleton_node/1,
|
||||
@ -81,6 +83,9 @@
|
||||
partition/2,
|
||||
pbc/1,
|
||||
pbc_read/3,
|
||||
pbc_read/4,
|
||||
pbc_read_check/4,
|
||||
pbc_read_check/5,
|
||||
pbc_set_bucket_prop/3,
|
||||
pbc_write/4,
|
||||
pbc_put_dir/3,
|
||||
@ -809,7 +814,11 @@ brutal_kill(Node) ->
|
||||
rt_cover:maybe_stop_on_node(Node),
|
||||
lager:info("Killing node ~p", [Node]),
|
||||
OSPidToKill = rpc:call(Node, os, getpid, []),
|
||||
rpc:cast(Node, os, cmd, [io_lib:format("kill -9 ~s", [OSPidToKill])]),
|
||||
%% try a normal kill first, but set a timer to
|
||||
%% kill -9 after 5 seconds just in case
|
||||
rpc:cast(Node, timer, apply_after,
|
||||
[5000, os, cmd, [io_lib:format("kill -9 ~s", [OSPidToKill])]]),
|
||||
rpc:cast(Node, os, cmd, [io_lib:format("kill -15 ~s", [OSPidToKill])]),
|
||||
ok.
|
||||
|
||||
capability(Node, all) ->
|
||||
@ -1150,6 +1159,56 @@ get_replica(Node, Bucket, Key, I, N) ->
|
||||
?assert(false)
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc PBC-based version of {@link systest_write/1}
|
||||
pbc_systest_write(Node, Size) ->
|
||||
pbc_systest_write(Node, Size, 2).
|
||||
|
||||
pbc_systest_write(Node, Size, W) ->
|
||||
pbc_systest_write(Node, 1, Size, <<"systest">>, W).
|
||||
|
||||
pbc_systest_write(Node, Start, End, Bucket, W) ->
|
||||
rt:wait_for_service(Node, riak_kv),
|
||||
Pid = pbc(Node),
|
||||
F = fun(N, Acc) ->
|
||||
Obj = riakc_obj:new(Bucket, <<N:32/integer>>, <<N:32/integer>>),
|
||||
try riakc_pb_socket:put(Pid, Obj, W) of
|
||||
ok ->
|
||||
Acc;
|
||||
Other ->
|
||||
[{N, Other} | Acc]
|
||||
catch
|
||||
What:Why ->
|
||||
[{N, {What, Why}} | Acc]
|
||||
end
|
||||
end,
|
||||
lists:foldl(F, [], lists:seq(Start, End)).
|
||||
|
||||
pbc_systest_read(Node, Size) ->
|
||||
pbc_systest_read(Node, Size, 2).
|
||||
|
||||
pbc_systest_read(Node, Size, R) ->
|
||||
pbc_systest_read(Node, 1, Size, <<"systest">>, R).
|
||||
|
||||
pbc_systest_read(Node, Start, End, Bucket, R) ->
|
||||
rt:wait_for_service(Node, riak_kv),
|
||||
Pid = pbc(Node),
|
||||
F = fun(N, Acc) ->
|
||||
case riakc_pb_socket:get(Pid, Bucket, <<N:32/integer>>, R) of
|
||||
{ok, Obj} ->
|
||||
case riakc_obj:get_value(Obj) of
|
||||
<<N:32/integer>> ->
|
||||
Acc;
|
||||
WrongVal ->
|
||||
[{N, {wrong_val, WrongVal}} | Acc]
|
||||
end;
|
||||
Other ->
|
||||
[{N, Other} | Acc]
|
||||
end
|
||||
end,
|
||||
lists:foldl(F, [], lists:seq(Start, End)).
|
||||
|
||||
%%%===================================================================
|
||||
%%% PBC & HTTPC Functions
|
||||
%%%===================================================================
|
||||
@ -1166,9 +1225,26 @@ pbc(Node) ->
|
||||
%% @doc does a read via the erlang protobuf client
|
||||
-spec pbc_read(pid(), binary(), binary()) -> binary().
|
||||
pbc_read(Pid, Bucket, Key) ->
|
||||
{ok, Value} = riakc_pb_socket:get(Pid, Bucket, Key),
|
||||
pbc_read(Pid, Bucket, Key, []).
|
||||
|
||||
-spec pbc_read(pid(), binary(), binary(), [any()]) -> binary().
|
||||
pbc_read(Pid, Bucket, Key, Options) ->
|
||||
{ok, Value} = riakc_pb_socket:get(Pid, Bucket, Key, Options),
|
||||
Value.
|
||||
|
||||
-spec pbc_read_check(pid(), binary(), binary(), [any()]) -> boolean().
|
||||
pbc_read_check(Pid, Bucket, Key, Allowed) ->
|
||||
pbc_read_check(Pid, Bucket, Key, Allowed, []).
|
||||
|
||||
-spec pbc_read_check(pid(), binary(), binary(), [any()], [any()]) -> boolean().
|
||||
pbc_read_check(Pid, Bucket, Key, Allowed, Options) ->
|
||||
case riakc_pb_socket:get(Pid, Bucket, Key, Options) of
|
||||
{ok, _} ->
|
||||
true = lists:member(ok, Allowed);
|
||||
Other ->
|
||||
lists:member(Other, Allowed) orelse throw({failed, Other, Allowed})
|
||||
end.
|
||||
|
||||
%% @doc does a write via the erlang protobuf client
|
||||
-spec pbc_write(pid(), binary(), binary(), binary()) -> atom().
|
||||
pbc_write(Pid, Bucket, Key, Value) ->
|
||||
@ -1385,7 +1461,7 @@ log_to_nodes(Nodes, LFmt, LArgs) ->
|
||||
[] -> [info, Meta, "---riak_test--- " ++ LFmt];
|
||||
_ -> [info, Meta, "---riak_test--- " ++ LFmt, LArgs]
|
||||
end,
|
||||
[rpc:call(Node, Module, Function, Args) || Node <- Nodes].
|
||||
[rpc:call(Node, Module, Function, Args) || Node <- lists:flatten(Nodes)].
|
||||
|
||||
%% @private utility function
|
||||
pmap(F, L) ->
|
||||
|
113
src/rtdev.erl
113
src/rtdev.erl
@ -1,6 +1,6 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013 Basho Technologies, Inc.
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
@ -84,6 +84,10 @@ run_riak_repl(N, Path, Cmd) ->
|
||||
%% they should already be setup at this point
|
||||
|
||||
setup_harness(_Test, _Args) ->
|
||||
%% make sure we stop any cover processes on any nodes
|
||||
%% otherwise, if the next test boots a legacy node we'll end up with cover
|
||||
%% incompatabilities and crash the cover server
|
||||
rt_cover:maybe_stop_on_nodes(),
|
||||
Path = relpath(root),
|
||||
%% Stop all discoverable nodes, not just nodes we'll be using for this test.
|
||||
rt:pmap(fun(X) -> stop_all(X ++ "/dev") end, devpaths()),
|
||||
@ -411,33 +415,96 @@ deploy_nodes(NodeConfig) ->
|
||||
lager:info("Deployed nodes: ~p", [Nodes]),
|
||||
Nodes.
|
||||
|
||||
gen_stop_fun(Timeout) ->
|
||||
fun({C,Node}) ->
|
||||
net_kernel:hidden_connect_node(Node),
|
||||
case rpc:call(Node, os, getpid, []) of
|
||||
PidStr when is_list(PidStr) ->
|
||||
lager:info("Preparing to stop node ~p (process ID ~s) with init:stop/0...",
|
||||
[Node, PidStr]),
|
||||
rpc:call(Node, init, stop, []),
|
||||
%% If init:stop/0 fails here, the wait_for_pid/2 call
|
||||
%% below will timeout and the process will get cleaned
|
||||
%% up by the kill_stragglers/2 function
|
||||
wait_for_pid(PidStr, Timeout);
|
||||
BadRpc ->
|
||||
Cmd = C ++ "/bin/riak stop",
|
||||
lager:info("RPC to node ~p returned ~p, will try stop anyway... ~s",
|
||||
[Node, BadRpc, Cmd]),
|
||||
Output = os:cmd(Cmd),
|
||||
Status = case Output of
|
||||
"ok\n" ->
|
||||
%% Telling the node to stop worked,
|
||||
%% but now we must wait the full node
|
||||
%% shutdown_time to allow it to
|
||||
%% properly shut down, otherwise it
|
||||
%% will get prematurely killed by
|
||||
%% kill_stragglers/2 below.
|
||||
timer:sleep(Timeout),
|
||||
"ok";
|
||||
_ ->
|
||||
"wasn't running"
|
||||
end,
|
||||
lager:info("Stopped node ~p, stop status: ~s.", [Node, Status])
|
||||
end
|
||||
end.
|
||||
|
||||
kill_stragglers(DevPath, Timeout) ->
|
||||
{ok, Re} = re:compile("^\\s*\\S+\\s+(\\d+).+\\d+\\s+"++DevPath++"\\S+/beam"),
|
||||
ReOpts = [{capture,all_but_first,list}],
|
||||
Pids = tl(string:tokens(os:cmd("ps -ef"), "\n")),
|
||||
Fold = fun(Proc, Acc) ->
|
||||
case re:run(Proc, Re, ReOpts) of
|
||||
nomatch ->
|
||||
Acc;
|
||||
{match,[Pid]} ->
|
||||
lager:info("Process ~s still running, killing...",
|
||||
[Pid]),
|
||||
os:cmd("kill -15 "++Pid),
|
||||
case wait_for_pid(Pid, Timeout) of
|
||||
ok -> ok;
|
||||
fail ->
|
||||
lager:info("Process ~s still hasn't stopped, "
|
||||
"resorting to kill -9...", [Pid]),
|
||||
os:cmd("kill -9 "++Pid)
|
||||
end,
|
||||
[Pid|Acc]
|
||||
end
|
||||
end,
|
||||
lists:foldl(Fold, [], Pids).
|
||||
|
||||
wait_for_pid(PidStr, Timeout) ->
|
||||
F = fun() ->
|
||||
os:cmd("kill -0 "++PidStr) =/= []
|
||||
end,
|
||||
Retries = Timeout div 1000,
|
||||
case rt:wait_until(F, Retries, 1000) of
|
||||
{fail, _} -> fail;
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
stop_all(DevPath) ->
|
||||
case filelib:is_dir(DevPath) of
|
||||
true ->
|
||||
Devs = filelib:wildcard(DevPath ++ "/dev*"),
|
||||
%% Works, but I'd like it to brag a little more about it.
|
||||
Stop = fun(C) ->
|
||||
%% If getpid available, kill -9 with it, we're serious here
|
||||
[MaybePid | _] = string:tokens(os:cmd(C ++ "/bin/riak getpid"),
|
||||
"\n"),
|
||||
try
|
||||
_ = list_to_integer(MaybePid),
|
||||
{0, Out} = cmd("kill -9 "++MaybePid),
|
||||
Out
|
||||
catch
|
||||
_:_ -> ok
|
||||
end,
|
||||
Cmd = C ++ "/bin/riak stop",
|
||||
{_, StopOut} = cmd(Cmd),
|
||||
[Output | _Tail] = string:tokens(StopOut, "\n"),
|
||||
Status = case Output of
|
||||
"ok" -> "ok";
|
||||
_ -> "wasn't running"
|
||||
end,
|
||||
lager:info("Stopped Node... ~s ~~ ~s.", [Cmd, Status])
|
||||
Nodes = [?DEV(N) || N <- lists:seq(1, length(Devs))],
|
||||
MyNode = 'riak_test@127.0.0.1',
|
||||
case net_kernel:start([MyNode, longnames]) of
|
||||
{ok, _} ->
|
||||
true = erlang:set_cookie(MyNode, riak);
|
||||
{error,{already_started,_}} ->
|
||||
ok
|
||||
end,
|
||||
rt:pmap(Stop, Devs);
|
||||
_ -> lager:info("~s is not a directory.", [DevPath])
|
||||
lager:info("Trying to obtain node shutdown_time via RPC..."),
|
||||
Tmout = case rpc:call(hd(Nodes), init, get_argument, [shutdown_time]) of
|
||||
{ok,[[Tm]]} -> list_to_integer(Tm)+10000;
|
||||
_ -> 20000
|
||||
end,
|
||||
lager:info("Using node shutdown_time of ~w", [Tmout]),
|
||||
rt:pmap(gen_stop_fun(Tmout), lists:zip(Devs, Nodes)),
|
||||
kill_stragglers(DevPath, Tmout);
|
||||
_ ->
|
||||
lager:info("~s is not a directory.", [DevPath])
|
||||
end,
|
||||
ok.
|
||||
|
||||
|
@ -45,7 +45,7 @@ confirm() ->
|
||||
|
||||
%% write implicitly to the default bucket
|
||||
riakc_pb_socket:put(PB, riakc_obj:update_value(O1, <<"newvalue">>)),
|
||||
|
||||
|
||||
%% read from the default bucket explicitly
|
||||
{ok, O3} = riakc_pb_socket:get(PB, {<<"default">>, <<"bucket">>}, <<"key">>),
|
||||
|
||||
@ -124,6 +124,35 @@ confirm() ->
|
||||
?assertEqual({ok, []}, riakc_pb_socket:list_buckets(PB)),
|
||||
?assertEqual({ok, [<<"bucket">>]}, riakc_pb_socket:list_buckets(PB, Type)),
|
||||
|
||||
%%% Beginning of UTF-8 test
|
||||
|
||||
lager:info("UTF-8 type get/put test"),
|
||||
%% こんにちは - konnichiwa (Japanese)
|
||||
UnicodeType = unicode:characters_to_binary([12371,12435,12395,12385,12399], utf8),
|
||||
%% سلام - Salam (Arabic)
|
||||
UnicodeBucket = unicode:characters_to_binary([1587,1604,1575,1605], utf8),
|
||||
rt:create_and_activate_bucket_type(Node, UnicodeType, [{n_val, 3}]),
|
||||
rt:wait_until_bucket_type_status(UnicodeType, active, Nodes),
|
||||
|
||||
lager:info("doing put"),
|
||||
riakc_pb_socket:put(PB, riakc_obj:new({UnicodeType, UnicodeBucket},
|
||||
<<"key">>, <<"yetanothervalue">>)),
|
||||
|
||||
lager:info("doing get"),
|
||||
{ok, O6} = riakc_pb_socket:get(PB, {UnicodeType, UnicodeBucket}, <<"key">>),
|
||||
|
||||
?assertEqual(<<"yetanothervalue">>, riakc_obj:get_value(O6)),
|
||||
|
||||
lager:info("custom type list_keys test"),
|
||||
?assertEqual({ok, [<<"key">>]}, riakc_pb_socket:list_keys(PB,
|
||||
{UnicodeType,
|
||||
UnicodeBucket})),
|
||||
lager:info("custom type list_buckets test"),
|
||||
%% list buckets
|
||||
?assertEqual({ok, [UnicodeBucket]}, riakc_pb_socket:list_buckets(PB, UnicodeType)),
|
||||
|
||||
%%% End of UTF-8 test
|
||||
|
||||
lager:info("bucket properties tests"),
|
||||
riakc_pb_socket:set_bucket(PB, {<<"default">>, <<"mybucket">>},
|
||||
[{n_val, 5}]),
|
||||
@ -145,12 +174,26 @@ confirm() ->
|
||||
<<"mybucket">>}),
|
||||
?assertEqual(5, proplists:get_value(n_val, BProps3)),
|
||||
|
||||
%% Check our unicode brethren
|
||||
riakc_pb_socket:set_bucket(PB, {UnicodeType, UnicodeBucket},
|
||||
[{n_val, 4}]),
|
||||
{ok, UBProps1} = riakc_pb_socket:get_bucket(PB, {UnicodeType,
|
||||
UnicodeBucket}),
|
||||
?assertEqual(4, proplists:get_value(n_val, UBProps1)),
|
||||
|
||||
riakc_pb_socket:reset_bucket(PB, {Type, <<"mybucket">>}),
|
||||
|
||||
{ok, BProps4} = riakc_pb_socket:get_bucket(PB, {Type,
|
||||
<<"mybucket">>}),
|
||||
?assertEqual(3, proplists:get_value(n_val, BProps4)),
|
||||
|
||||
riakc_pb_socket:reset_bucket(PB, {UnicodeType, UnicodeBucket}),
|
||||
|
||||
{ok, UBProps2} = riakc_pb_socket:get_bucket(PB, {UnicodeType,
|
||||
UnicodeBucket}),
|
||||
|
||||
?assertEqual(3, proplists:get_value(n_val, UBProps2)),
|
||||
|
||||
lager:info("bucket type properties test"),
|
||||
|
||||
riakc_pb_socket:set_bucket_type(PB, Type,
|
||||
@ -171,6 +214,27 @@ confirm() ->
|
||||
|
||||
?assertEqual(3, proplists:get_value(n_val, BProps7)),
|
||||
|
||||
%% Repeat type checks for unicode type/bucket
|
||||
|
||||
riakc_pb_socket:set_bucket_type(PB, UnicodeType,
|
||||
[{n_val, 5}]),
|
||||
|
||||
{ok, UBProps3} = riakc_pb_socket:get_bucket_type(PB, UnicodeType),
|
||||
|
||||
?assertEqual(5, proplists:get_value(n_val, UBProps3)),
|
||||
|
||||
%% check that the bucket inherits from its type
|
||||
{ok, UBProps4} = riakc_pb_socket:get_bucket(PB, {UnicodeType,
|
||||
UnicodeBucket}),
|
||||
|
||||
?assertEqual(5, proplists:get_value(n_val, UBProps4)),
|
||||
|
||||
riakc_pb_socket:set_bucket_type(PB, UnicodeType, [{n_val, 3}]),
|
||||
|
||||
{ok, UBProps5} = riakc_pb_socket:get_bucket_type(PB, UnicodeType),
|
||||
|
||||
?assertEqual(3, proplists:get_value(n_val, UBProps5)),
|
||||
|
||||
%% make sure a regular bucket under the default type reflects app.config
|
||||
{ok, BProps8} = riakc_pb_socket:get_bucket(PB, {<<"default">>,
|
||||
<<"mybucket">>}),
|
||||
|
@ -24,8 +24,8 @@
|
||||
-compile(export_all).
|
||||
|
||||
spam_nodes(TargetNodes) ->
|
||||
[[spawn(?MODULE, spam, [N]) || _ <- lists:seq(1,10*1000)] || N <- TargetNodes].
|
||||
[[spawn(?MODULE, spam, [N]) || _ <- lists:seq(1,1000*1000)] || N <- TargetNodes].
|
||||
|
||||
spam(Node) ->
|
||||
timer:sleep(random:uniform(1500)),
|
||||
timer:sleep(random:uniform(100)),
|
||||
catch rpc:call(Node, erlang, whereis, [rex]).
|
||||
|
31
tests/ensemble_basic.erl
Normal file
31
tests/ensemble_basic.erl
Normal file
@ -0,0 +1,31 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(ensemble_basic).
|
||||
-export([confirm/0]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
NumNodes = 5,
|
||||
NVal = 5,
|
||||
Config = ensemble_util:fast_config(NVal),
|
||||
lager:info("Building cluster and waiting for ensemble to stablize"),
|
||||
ensemble_util:build_cluster(NumNodes, Config, NVal),
|
||||
pass.
|
36
tests/ensemble_basic2.erl
Normal file
36
tests/ensemble_basic2.erl
Normal file
@ -0,0 +1,36 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(ensemble_basic2).
|
||||
-export([confirm/0]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
NumNodes = 5,
|
||||
NVal = 5,
|
||||
Config = ensemble_util:fast_config(NVal),
|
||||
lager:info("Building cluster and waiting for ensemble to stablize"),
|
||||
Nodes = ensemble_util:build_cluster(NumNodes, Config, NVal),
|
||||
Node = hd(Nodes),
|
||||
Ensembles = ensemble_util:ensembles(Node),
|
||||
lager:info("Killing all ensemble leaders"),
|
||||
ok = ensemble_util:kill_leaders(Node, Ensembles),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
pass.
|
100
tests/ensemble_basic3.erl
Normal file
100
tests/ensemble_basic3.erl
Normal file
@ -0,0 +1,100 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(ensemble_basic3).
|
||||
-export([confirm/0]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
NumNodes = 5,
|
||||
NVal = 5,
|
||||
Quorum = NVal div 2 + 1,
|
||||
Config = ensemble_util:fast_config(NVal),
|
||||
lager:info("Building cluster and waiting for ensemble to stablize"),
|
||||
Nodes = ensemble_util:build_cluster(NumNodes, Config, NVal),
|
||||
vnode_util:load(Nodes),
|
||||
Node = hd(Nodes),
|
||||
Ensembles = ensemble_util:ensembles(Node),
|
||||
lager:info("Killing all ensemble leaders"),
|
||||
ok = ensemble_util:kill_leaders(Node, Ensembles),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
|
||||
lager:info("Creating/activating 'strong' bucket type"),
|
||||
rt:create_and_activate_bucket_type(Node, <<"strong">>,
|
||||
[{consistent, true}, {n_val, NVal}]),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
Bucket = {<<"strong">>, <<"test">>},
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1,1000)],
|
||||
|
||||
Key1 = hd(Keys),
|
||||
DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [{Bucket, Key1}]),
|
||||
PL = rpc:call(Node, riak_core_apl, get_primary_apl, [DocIdx, NVal, riak_kv]),
|
||||
All = [VN || {VN, _} <- PL],
|
||||
Other = [VN || {VN={_, Owner}, _} <- PL,
|
||||
Owner =/= Node],
|
||||
|
||||
Minority = NVal - Quorum,
|
||||
PartitionedVN = lists:sublist(Other, Minority),
|
||||
Partitioned = [VNode || {_, VNode} <- PartitionedVN],
|
||||
MajorityVN = All -- PartitionedVN,
|
||||
|
||||
PBC = rt:pbc(Node),
|
||||
|
||||
lager:info("Partitioning quorum minority: ~p", [Partitioned]),
|
||||
Part = rt:partition(Nodes -- Partitioned, Partitioned),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Writing ~p consistent keys", [1000]),
|
||||
[ok = rt:pbc_write(PBC, Bucket, Key, Key) || Key <- Keys],
|
||||
|
||||
lager:info("Read keys to verify they exist"),
|
||||
[rt:pbc_read(PBC, Bucket, Key) || Key <- Keys],
|
||||
|
||||
lager:info("Healing partition"),
|
||||
rt:heal(Part),
|
||||
|
||||
lager:info("Suspending majority vnodes"),
|
||||
L = [begin
|
||||
lager:info("Suspending vnode: ~p", [VIdx]),
|
||||
Pid = vnode_util:suspend_vnode(VNode, VIdx),
|
||||
{VN, Pid}
|
||||
end || VN={VIdx, VNode} <- MajorityVN],
|
||||
L2 = orddict:from_list(L),
|
||||
|
||||
lager:info("Sleeping for 5s"),
|
||||
timer:sleep(5000),
|
||||
|
||||
lists:foldl(fun({VN={VIdx, VNode}, Pid}, Suspended) ->
|
||||
lager:info("Resuming vnode: ~p", [VIdx]),
|
||||
vnode_util:resume_vnode(Pid),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
lager:info("Re-reading keys"),
|
||||
[rt:pbc_read(PBC, Bucket, Key) || Key <- Keys],
|
||||
lager:info("Suspending vnode: ~p", [VIdx]),
|
||||
Pid2 = vnode_util:suspend_vnode(VNode, VIdx),
|
||||
orddict:store(VN, Pid2, Suspended)
|
||||
end, L2, L2),
|
||||
|
||||
lager:info("Resuming all vnodes"),
|
||||
[vnode_util:resume_vnode(Pid) || {_, Pid} <- L2],
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
lager:info("Re-reading keys"),
|
||||
[rt:pbc_read(PBC, Bucket, Key) || Key <- Keys],
|
||||
pass.
|
67
tests/ensemble_basic4.erl
Normal file
67
tests/ensemble_basic4.erl
Normal file
@ -0,0 +1,67 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(ensemble_basic4).
|
||||
-export([confirm/0]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
NumNodes = 5,
|
||||
NVal = 5,
|
||||
Quorum = NVal div 2 + 1,
|
||||
Config = ensemble_util:fast_config(NVal),
|
||||
lager:info("Building cluster and waiting for ensemble to stablize"),
|
||||
Nodes = ensemble_util:build_cluster(NumNodes, Config, NVal),
|
||||
Node = hd(Nodes),
|
||||
|
||||
lager:info("Creating/activating 'strong' bucket type"),
|
||||
rt:create_and_activate_bucket_type(Node, <<"strong">>,
|
||||
[{consistent, true}, {n_val, NVal}]),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
Bucket = {<<"strong">>, <<"test">>},
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1,1000)],
|
||||
|
||||
Key1 = hd(Keys),
|
||||
DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [{Bucket, Key1}]),
|
||||
PL = rpc:call(Node, riak_core_apl, get_primary_apl, [DocIdx, NVal, riak_kv]),
|
||||
Other = [VN || {VN={_, Owner}, _} <- PL,
|
||||
Owner =/= Node],
|
||||
|
||||
Minority = NVal - Quorum,
|
||||
PartitionedVN = lists:sublist(Other, Minority),
|
||||
Partitioned = [VNode || {_, VNode} <- PartitionedVN],
|
||||
|
||||
PBC = rt:pbc(Node),
|
||||
|
||||
lager:info("Partitioning quorum minority: ~p", [Partitioned]),
|
||||
Part = rt:partition(Nodes -- Partitioned, Partitioned),
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, set_mode, [manual]),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Writing ~p consistent keys", [1000]),
|
||||
[ok = rt:pbc_write(PBC, Bucket, Key, Key) || Key <- Keys],
|
||||
|
||||
lager:info("Read keys to verify they exist"),
|
||||
[rt:pbc_read(PBC, Bucket, Key) || Key <- Keys],
|
||||
|
||||
lager:info("Healing partition"),
|
||||
rt:heal(Part),
|
||||
|
||||
pass.
|
101
tests/ensemble_interleave.erl
Normal file
101
tests/ensemble_interleave.erl
Normal file
@ -0,0 +1,101 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
%% Tests the specific corner case where two ensemble peers become
|
||||
%% corrupted one after the other. The goal is to provoke the scenario
|
||||
%% where one of the peers initially trusts the other and syncs with it,
|
||||
%% but completes the sync after the peer becomes untrusted.
|
||||
%%
|
||||
%% Actually hitting this specific interleaving may require multiple runs,
|
||||
%% but it has been observed and lead to the addition of the `check_sync`
|
||||
%% logic to riak_ensemble/riak_ensemble_peer.erl that verifies a peer is
|
||||
%% still trustworthy after a peer syncs with it.
|
||||
%%
|
||||
%% Without the check_sync addition, this test could incorectly report
|
||||
%% {error, notfound} -- eg. data loss. With the addition, this test
|
||||
%% should now always pass.
|
||||
|
||||
-module(ensemble_interleave).
|
||||
-export([confirm/0]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
NVal = 5,
|
||||
Quorum = NVal div 2 + 1,
|
||||
Config = ensemble_util:fast_config(NVal),
|
||||
lager:info("Building cluster and waiting for ensemble to stablize"),
|
||||
Nodes = ensemble_util:build_cluster(8, Config, NVal),
|
||||
Node = hd(Nodes),
|
||||
vnode_util:load(Nodes),
|
||||
|
||||
lager:info("Creating/activating 'strong' bucket type"),
|
||||
rt:create_and_activate_bucket_type(Node, <<"strong">>,
|
||||
[{consistent, true}, {n_val, NVal}]),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
Bucket = {<<"strong">>, <<"test">>},
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1,1000)],
|
||||
|
||||
Key1 = hd(Keys),
|
||||
DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [{Bucket, Key1}]),
|
||||
PL = rpc:call(Node, riak_core_apl, get_primary_apl, [DocIdx, NVal, riak_kv]),
|
||||
All = [VN || {VN, _} <- PL],
|
||||
Other = [VN || {VN={_, Owner}, _} <- PL,
|
||||
Owner =/= Node],
|
||||
|
||||
Minority = NVal - Quorum,
|
||||
PartitionedVN = lists:sublist(Other, Minority),
|
||||
Partitioned = [VNode || {_, VNode} <- PartitionedVN],
|
||||
[KillFirst,KillSecond|Suspend] = All -- PartitionedVN,
|
||||
|
||||
io:format("PL: ~p~n", [PL]),
|
||||
PBC = rt:pbc(Node),
|
||||
Options = [{timeout, 500}],
|
||||
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, set_mode, [manual]),
|
||||
Part = rt:partition(Nodes -- Partitioned, Partitioned),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Writing ~p consistent keys", [1000]),
|
||||
[ok = rt:pbc_write(PBC, Bucket, Key, Key) || Key <- Keys],
|
||||
|
||||
lager:info("Read keys to verify they exist"),
|
||||
[rt:pbc_read(PBC, Bucket, Key, Options) || Key <- Keys],
|
||||
rt:heal(Part),
|
||||
|
||||
[begin
|
||||
lager:info("Suspending vnode: ~p", [VIdx]),
|
||||
vnode_util:suspend_vnode(VNode, VIdx)
|
||||
end || {VIdx, VNode} <- Suspend],
|
||||
|
||||
vnode_util:kill_vnode(KillFirst),
|
||||
timer:sleep(5000),
|
||||
vnode_util:kill_vnode(KillSecond),
|
||||
vnode_util:rebuild_vnode(KillFirst),
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, set_mode, [automatic]),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Disabling AAE"),
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, disable, []),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Re-reading keys to verify they exist"),
|
||||
Expect = [ok, {error, timeout}, {error, <<"timeout">>}],
|
||||
[rt:pbc_read_check(PBC, Bucket, Key, Expect, Options) || Key <- Keys],
|
||||
pass.
|
127
tests/ensemble_sync.erl
Normal file
127
tests/ensemble_sync.erl
Normal file
@ -0,0 +1,127 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(ensemble_sync).
|
||||
-export([confirm/0]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
NVal = 5,
|
||||
Config = ensemble_util:fast_config(NVal),
|
||||
Nodes = ensemble_util:build_cluster(8, Config, NVal),
|
||||
Node = hd(Nodes),
|
||||
vnode_util:load(Nodes),
|
||||
|
||||
lager:info("Creating/activating 'strong' bucket type"),
|
||||
rt:create_and_activate_bucket_type(Node, <<"strong">>,
|
||||
[{consistent, true}, {n_val, NVal}]),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
|
||||
ExpectOkay = [ok],
|
||||
ExpectTimeout = [{error, timeout}, {error, <<"timeout">>} | ExpectOkay],
|
||||
ExpectFail = [{error, notfound} | ExpectTimeout],
|
||||
|
||||
Scenarios = [%% corrupted, suspended, valid, empty, bucket, expect
|
||||
{1, 1, 1, 2, <<"test1">>, ExpectOkay},
|
||||
{1, 2, 0, 2, <<"test2">>, ExpectTimeout},
|
||||
{2, 1, 0, 2, <<"test3">>, ExpectTimeout},
|
||||
{3, 0, 0, 2, <<"test4">>, ExpectFail}
|
||||
],
|
||||
[ok = run_scenario(Nodes, NVal, Scenario) || Scenario <- Scenarios],
|
||||
pass.
|
||||
|
||||
run_scenario(Nodes, NVal, {NumKill, NumSuspend, NumValid, _, Name, Expect}) ->
|
||||
Node = hd(Nodes),
|
||||
Quorum = NVal div 2 + 1,
|
||||
Bucket = {<<"strong">>, Name},
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1,1000)],
|
||||
|
||||
Key1 = hd(Keys),
|
||||
DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [{Bucket, Key1}]),
|
||||
PL = rpc:call(Node, riak_core_apl, get_primary_apl, [DocIdx, NVal, riak_kv]),
|
||||
All = [VN || {VN, _} <- PL],
|
||||
Other = [VN || {VN={_, Owner}, _} <- PL,
|
||||
Owner =/= Node],
|
||||
|
||||
Minority = NVal - Quorum,
|
||||
PartitionedVN = lists:sublist(Other, Minority),
|
||||
Partitioned = [VNode || {_, VNode} <- PartitionedVN],
|
||||
Valid = All -- PartitionedVN,
|
||||
|
||||
{KillVN, Valid2} = lists:split(NumKill, Valid),
|
||||
{SuspendVN, Valid3} = lists:split(NumSuspend, Valid2),
|
||||
{AfterVN, _} = lists:split(NumValid, Valid3),
|
||||
|
||||
io:format("PL: ~p~n", [PL]),
|
||||
PBC = rt:pbc(Node),
|
||||
Options = [{timeout, 500}],
|
||||
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, set_mode, [manual]),
|
||||
Part = rt:partition(Nodes -- Partitioned, Partitioned),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
%% Write data while minority is partitioned
|
||||
lager:info("Writing ~p consistent keys", [1000]),
|
||||
[ok = rt:pbc_write(PBC, Bucket, Key, Key) || Key <- Keys],
|
||||
|
||||
lager:info("Read keys to verify they exist"),
|
||||
[rt:pbc_read(PBC, Bucket, Key, Options) || Key <- Keys],
|
||||
rt:heal(Part),
|
||||
|
||||
%% Suspend desired number of valid vnodes
|
||||
S1 = [vnode_util:suspend_vnode(VNode, VIdx) || {VIdx, VNode} <- SuspendVN],
|
||||
|
||||
%% Kill/corrupt desired number of valid vnodes
|
||||
[vnode_util:kill_vnode(VN) || VN <- KillVN],
|
||||
[vnode_util:rebuild_vnode(VN) || VN <- KillVN],
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, set_mode, [automatic]),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Disabling AAE"),
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, disable, []),
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
%% Suspend remaining valid vnodes to ensure data comes from repaired vnodes
|
||||
S2 = [vnode_util:suspend_vnode(VNode, VIdx) || {VIdx, VNode} <- AfterVN],
|
||||
ensemble_util:wait_until_stable(Node, Quorum),
|
||||
|
||||
lager:info("Checking that key results match scenario"),
|
||||
[rt:pbc_read_check(PBC, Bucket, Key, Expect, Options) || Key <- Keys],
|
||||
|
||||
lager:info("Re-enabling AAE"),
|
||||
rpc:multicall(Nodes, riak_kv_entropy_manager, enable, []),
|
||||
|
||||
lager:info("Resuming all vnodes"),
|
||||
[vnode_util:resume_vnode(Pid) || Pid <- S1 ++ S2],
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
|
||||
%% Check that for other than the "all bets are off" failure case,
|
||||
%% we can successfully read all keys after all vnodes are available.
|
||||
case lists:member({error, notfound}, Expect) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
lager:info("Re-reading keys to verify they exist"),
|
||||
[rt:pbc_read(PBC, Bucket, Key, Options) || Key <- Keys]
|
||||
end,
|
||||
|
||||
lager:info("Scenario passed"),
|
||||
lager:info("-----------------------------------------------------"),
|
||||
ok.
|
132
tests/ensemble_util.erl
Normal file
132
tests/ensemble_util.erl
Normal file
@ -0,0 +1,132 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(ensemble_util).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
build_cluster(Num, Config, NVal) ->
|
||||
Nodes = rt:deploy_nodes(Num, Config),
|
||||
Node = hd(Nodes),
|
||||
ok = rpc:call(Node, riak_ensemble_manager, enable, []),
|
||||
_ = rpc:call(Node, riak_core_ring_manager, force_update, []),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
rt:join_cluster(Nodes),
|
||||
ensemble_util:wait_until_cluster(Nodes),
|
||||
ensemble_util:wait_for_membership(Node),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
Nodes.
|
||||
|
||||
fast_config(NVal) ->
|
||||
fast_config(NVal, 16).
|
||||
|
||||
fast_config(NVal, RingSize) ->
|
||||
[{riak_kv, [{anti_entropy_build_limit, {100, 1000}},
|
||||
{anti_entropy_concurrency, 100},
|
||||
{anti_entropy_tick, 100},
|
||||
{anti_entropy, {on, []}},
|
||||
{anti_entropy_timeout, 5000},
|
||||
{storage_backend, riak_kv_memory_backend}]},
|
||||
{riak_core, [{default_bucket_props, [{n_val, NVal}]},
|
||||
{vnode_management_timer, 1000},
|
||||
{ring_creation_size, RingSize},
|
||||
{enable_consensus, true}]}].
|
||||
|
||||
ensembles(Node) ->
|
||||
rpc:call(Node, riak_kv_ensembles, ensembles, []).
|
||||
|
||||
get_leader_pid(Node, Ensemble) ->
|
||||
rpc:call(Node, riak_ensemble_manager, get_leader_pid, [Ensemble]).
|
||||
|
||||
kill_leader(Node, Ensemble) ->
|
||||
case get_leader_pid(Node, Ensemble) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
exit(Pid, kill),
|
||||
ok
|
||||
end.
|
||||
|
||||
kill_leaders(Node, Ensembles) ->
|
||||
_ = [kill_leader(Node, Ensemble) || Ensemble <- Ensembles],
|
||||
ok.
|
||||
|
||||
wait_until_cluster(Nodes) ->
|
||||
lager:info("Waiting until riak_ensemble cluster includes all nodes"),
|
||||
Node = hd(Nodes),
|
||||
F = fun() ->
|
||||
case rpc:call(Node, riak_ensemble_manager, cluster, []) of
|
||||
Nodes ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end,
|
||||
?assertEqual(ok, rt:wait_until(F)),
|
||||
lager:info("....cluster ready"),
|
||||
ok.
|
||||
|
||||
wait_until_stable(Node, Count) ->
|
||||
lager:info("Waiting until all ensembles are stable"),
|
||||
Ensembles = rpc:call(Node, riak_kv_ensembles, ensembles, []),
|
||||
wait_until_quorum(Node, root),
|
||||
[wait_until_quorum(Node, Ensemble) || Ensemble <- Ensembles],
|
||||
[wait_until_quorum_count(Node, Ensemble, Count) || Ensemble <- Ensembles],
|
||||
lager:info("....all stable"),
|
||||
ok.
|
||||
|
||||
wait_until_quorum(Node, Ensemble) ->
|
||||
F = fun() ->
|
||||
case rpc:call(Node, riak_ensemble_manager, check_quorum, [Ensemble, 500]) of
|
||||
true ->
|
||||
true;
|
||||
false ->
|
||||
lager:info("Not ready: ~p", [Ensemble]),
|
||||
false
|
||||
end
|
||||
end,
|
||||
?assertEqual(ok, rt:wait_until(F)).
|
||||
|
||||
wait_until_quorum_count(Node, Ensemble, Want) ->
|
||||
F = fun() ->
|
||||
case rpc:call(Node, riak_ensemble_manager, count_quorum, [Ensemble, 1500]) of
|
||||
Count when Count >= Want ->
|
||||
true;
|
||||
Count ->
|
||||
lager:info("Count: ~p :: ~p < ~p", [Ensemble, Count, Want]),
|
||||
false
|
||||
end
|
||||
end,
|
||||
?assertEqual(ok, rt:wait_until(F)).
|
||||
|
||||
wait_for_membership(Node) ->
|
||||
lager:info("Waiting until ensemble membership matches ring ownership"),
|
||||
F = fun() ->
|
||||
case rpc:call(Node, riak_kv_ensembles, check_membership, []) of
|
||||
Results when is_list(Results) ->
|
||||
[] =:= [x || false <- Results];
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end,
|
||||
?assertEqual(ok, rt:wait_until(F)),
|
||||
lager:info("....ownership matches"),
|
||||
ok.
|
@ -127,6 +127,39 @@ confirm() ->
|
||||
?assertEqual({ok, []}, rhc:list_buckets(RHC)),
|
||||
?assertEqual({ok, [<<"bucket">>]}, rhc:list_buckets(RHC, <<"mytype">>)),
|
||||
|
||||
lager:info("UTF-8 type get/put test"),
|
||||
%% こんにちは - konnichiwa (Japanese)
|
||||
UnicodeTypeBin = unicode:characters_to_binary([12371,12435,12395,12385,12399], utf8),
|
||||
%% سلام - Salam (Arabic)
|
||||
UnicodeBucketBin = unicode:characters_to_binary([1587,1604,1575,1605], utf8),
|
||||
|
||||
UCBBin = {UnicodeTypeBin, UnicodeBucketBin},
|
||||
|
||||
ok = rt:create_and_activate_bucket_type(Node, UnicodeTypeBin, [{n_val,3}]),
|
||||
|
||||
lager:info("doing put"),
|
||||
ok = rhc:put(RHC, riakc_obj:new(UCBBin,
|
||||
<<"key">>, <<"unicode">>)),
|
||||
|
||||
lager:info("doing get"),
|
||||
{ok, O6} = rhc:get(RHC, UCBBin, <<"key">>),
|
||||
|
||||
?assertEqual(<<"unicode">>, riakc_obj:get_value(O6)),
|
||||
|
||||
lager:info("unicode type list_keys test"),
|
||||
?assertEqual({ok, [<<"key">>]}, rhc:list_keys(RHC, UCBBin)),
|
||||
|
||||
lager:info("unicode type list_buckets test"),
|
||||
%% list buckets
|
||||
|
||||
%% This is a rather awkward representation, but it's what rhc is
|
||||
%% currently giving us. Curl gives us
|
||||
%% {"buckets":["\u0633\u0644\u0627\u0645"]} to illustrate where
|
||||
%% the values are coming from, and those are indeed the correct
|
||||
%% hexadecimal values for the UTF-8 representation of the bucket
|
||||
%% name
|
||||
?assertEqual({ok, [<<"0633064406270645">>]}, rhc:list_buckets(RHC, UnicodeTypeBin)),
|
||||
|
||||
lager:info("bucket properties tests"),
|
||||
rhc:set_bucket(RHC, {<<"default">>, <<"mybucket">>},
|
||||
[{n_val, 5}]),
|
||||
|
@ -67,40 +67,44 @@ confirm() ->
|
||||
"pass"}]),
|
||||
?assertMatch({error, {ok, "401", _, _}}, rhc:ping(C2)),
|
||||
|
||||
%% Store this in a variable so once Riak supports utf-8 usernames
|
||||
%% via HTTP(s) we can test it with just one change
|
||||
Username = "user",
|
||||
|
||||
lager:info("Creating user"),
|
||||
%% grant the user credentials
|
||||
ok = rpc:call(Node, riak_core_console, add_user, [["user", "password=password"]]),
|
||||
ok = rpc:call(Node, riak_core_console, add_user, [[Username, "password=password"]]),
|
||||
|
||||
lager:info("Setting trust mode on user"),
|
||||
%% trust anyone on localhost
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [["user",
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [[Username,
|
||||
"127.0.0.1/32",
|
||||
"trust"]]),
|
||||
|
||||
lager:info("Checking that credentials are ignored in trust mode"),
|
||||
%% invalid credentials should be ignored in trust mode
|
||||
C3 = rhc:create("127.0.0.1", Port, "riak", [{is_ssl, true}, {credentials,
|
||||
"user",
|
||||
Username,
|
||||
"pass"}]),
|
||||
?assertEqual(ok, rhc:ping(C3)),
|
||||
|
||||
lager:info("Setting password mode on user"),
|
||||
%% require password on localhost
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [["user",
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [[Username,
|
||||
"127.0.0.1/32",
|
||||
"password"]]),
|
||||
|
||||
lager:info("Checking that incorrect password demands reauth"),
|
||||
%% invalid credentials should be rejected in password mode
|
||||
C4 = rhc:create("127.0.0.1", Port, "riak", [{is_ssl, true}, {credentials,
|
||||
"user",
|
||||
Username,
|
||||
"pass"}]),
|
||||
?assertMatch({error, {ok, "401", _, _}}, rhc:ping(C4)),
|
||||
|
||||
lager:info("Checking that correct password is successful"),
|
||||
%% valid credentials should be accepted in password mode
|
||||
C5 = rhc:create("127.0.0.1", Port, "riak", [{is_ssl, true}, {credentials,
|
||||
"user",
|
||||
Username,
|
||||
"password"}]),
|
||||
|
||||
?assertEqual(ok, rhc:ping(C5)),
|
||||
@ -108,7 +112,7 @@ confirm() ->
|
||||
lager:info("verifying the peer certificate rejects mismatch with server cert"),
|
||||
%% verifying the peer certificate reject mismatch with server cert
|
||||
C6 = rhc:create("127.0.0.1", Port, "riak", [{is_ssl, true},
|
||||
{credentials, "user", "password"},
|
||||
{credentials, Username, "password"},
|
||||
{ssl_options, [
|
||||
{cacertfile, filename:join([PrivDir,
|
||||
"certs/cacert.org/ca/root.crt"])},
|
||||
@ -122,7 +126,7 @@ confirm() ->
|
||||
lager:info("verifying the peer certificate should work if the cert is valid"),
|
||||
%% verifying the peer certificate should work if the cert is valid
|
||||
C7 = rhc:create("127.0.0.1", Port, "riak", [{is_ssl, true},
|
||||
{credentials, "user", "password"},
|
||||
{credentials, Username, "password"},
|
||||
{ssl_options, [
|
||||
{cacertfile, filename:join([PrivDir,
|
||||
"certs/selfsigned/ca/rootcert.pem"])},
|
||||
@ -143,8 +147,8 @@ confirm() ->
|
||||
?assertMatch({error, {ok, "403", _, _}}, rhc:put(C7, Object)),
|
||||
|
||||
lager:info("Granting riak_kv.get, checking get works but put doesn't"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.get", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.get", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
|
||||
%% key is not present
|
||||
?assertMatch({error, notfound}, rhc:get(C7, <<"hello">>,
|
||||
@ -153,8 +157,8 @@ confirm() ->
|
||||
?assertMatch({error, {ok, "403", _, _}}, rhc:put(C7, Object)),
|
||||
|
||||
lager:info("Granting riak_kv.put, checking put works and roundtrips with get"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
|
||||
%% NOW we can put
|
||||
?assertEqual(ok, rhc:put(C7, Object)),
|
||||
@ -170,8 +174,8 @@ confirm() ->
|
||||
<<"world">>)),
|
||||
|
||||
lager:info("Granting riak_kv.delete, checking that delete succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.delete", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.delete", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
?assertEqual(ok, rhc:delete(C7, <<"hello">>,
|
||||
<<"world">>)),
|
||||
|
||||
@ -185,8 +189,8 @@ confirm() ->
|
||||
%% slam the door in the user's face
|
||||
lager:info("Revoking get/put/delete, checking that get/put/delete are disallowed"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke,
|
||||
[["riak_kv.put,riak_kv.get,riak_kv.delete", "ON",
|
||||
"default", "hello", "FROM", "user"]]),
|
||||
[["riak_kv.put,riak_kv.get,riak_kv.delete", "on",
|
||||
"default", "hello", "from", Username]]),
|
||||
|
||||
?assertMatch({error, {ok, "403", _, _}}, rhc:get(C7, <<"hello">>,
|
||||
<<"world">>)),
|
||||
@ -198,8 +202,8 @@ confirm() ->
|
||||
?assertMatch({error, {"403", _}}, rhc:list_buckets(C7)),
|
||||
|
||||
lager:info("Granting riak_kv.list_buckets, checking that list_buckets succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_buckets", "ON",
|
||||
"default", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_buckets", "on",
|
||||
"default", "to", Username]]),
|
||||
?assertMatch({ok, [<<"hello">>]}, rhc:list_buckets(C7)),
|
||||
|
||||
%% list keys
|
||||
@ -207,21 +211,21 @@ confirm() ->
|
||||
?assertMatch({error, {"403", _}}, rhc:list_keys(C7, <<"hello">>)),
|
||||
|
||||
lager:info("Granting riak_kv.list_keys, checking that list_keys succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "ON",
|
||||
"default", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "on",
|
||||
"default", "to", Username]]),
|
||||
|
||||
?assertMatch({ok, [<<"world">>]}, rhc:list_keys(C7, <<"hello">>)),
|
||||
|
||||
lager:info("Revoking list_keys"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.list_keys", "ON",
|
||||
"default", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.list_keys", "on",
|
||||
"default", "from", Username]]),
|
||||
|
||||
lager:info("Checking that get_bucket is disallowed"),
|
||||
?assertMatch({error, {ok, "403", _, _}}, rhc:get_bucket(C7, <<"hello">>)),
|
||||
|
||||
lager:info("Granting riak_core.get_bucket, checking that get_bucket succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.get_bucket", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.get_bucket", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
|
||||
?assertEqual(3, proplists:get_value(n_val, element(2, rhc:get_bucket(C7,
|
||||
<<"hello">>)))),
|
||||
@ -231,8 +235,8 @@ confirm() ->
|
||||
[{n_val, 5}])),
|
||||
|
||||
lager:info("Granting set_bucket, checking that set_bucket succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.set_bucket", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.set_bucket", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
|
||||
?assertEqual(ok, rhc:set_bucket(C7, <<"hello">>,
|
||||
[{n_val, 5}])),
|
||||
@ -244,8 +248,8 @@ confirm() ->
|
||||
|
||||
%% grant get/put again
|
||||
lager:info("Granting get/put for counters, checking value and increment"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.get,riak_kv.put", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.get,riak_kv.put", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
|
||||
|
||||
?assertMatch({error, {ok, "404", _, _}}, rhc:counter_val(C7, <<"hello">>,
|
||||
@ -260,7 +264,7 @@ confirm() ->
|
||||
%% revoke get
|
||||
lager:info("Revoking get, checking that value fails but increment succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke,
|
||||
[["riak_kv.get", "ON", "default", "hello", "FROM", "user"]]),
|
||||
[["riak_kv.get", "on", "default", "hello", "from", Username]]),
|
||||
|
||||
?assertMatch({error, {ok, "403", _, _}}, rhc:counter_val(C7, <<"hello">>,
|
||||
<<"numberofpies">>)),
|
||||
@ -270,15 +274,15 @@ confirm() ->
|
||||
%% revoke put
|
||||
lager:info("Revoking put, checking that increment fails"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke,
|
||||
[["riak_kv.put", "ON", "default", "hello", "FROM", "user"]]),
|
||||
[["riak_kv.put", "on", "default", "hello", "from", Username]]),
|
||||
|
||||
?assertMatch({error, {ok, "403", _, _}}, rhc:counter_incr(C7, <<"hello">>,
|
||||
<<"numberofpies">>, 5)),
|
||||
|
||||
%% mapred tests
|
||||
lager:info("Checking that full-bucket mapred is disallowed"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put", "ON",
|
||||
"default", "MR", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put", "on",
|
||||
"default", "MR", "to", Username]]),
|
||||
|
||||
|
||||
ok = rhc:put(C7, riakc_obj:new(<<"MR">>, <<"lobster_roll">>, <<"16">>,
|
||||
@ -298,8 +302,8 @@ confirm() ->
|
||||
true}])),
|
||||
|
||||
lager:info("Granting list-keys, asserting full-bucket mapred is still disallowed"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "ON",
|
||||
"default", "MR", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "on",
|
||||
"default", "MR", "to", Username]]),
|
||||
|
||||
?assertMatch({error, {"403", _}},
|
||||
rhc:mapred_bucket(C7, <<"MR">>, [{map, {jsfun,
|
||||
@ -309,8 +313,8 @@ confirm() ->
|
||||
true}])),
|
||||
|
||||
lager:info("Granting mapreduce, checking that job succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "ON",
|
||||
"default", "MR", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "on",
|
||||
"default", "MR", "to", Username]]),
|
||||
|
||||
?assertEqual({ok, [{1, [33]}]},
|
||||
rhc:mapred_bucket(C7, <<"MR">>, [{map, {jsfun,
|
||||
@ -369,8 +373,8 @@ confirm() ->
|
||||
reduce_set_union}, undefined,
|
||||
true}])),
|
||||
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "ON",
|
||||
"ANY", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "on",
|
||||
"any", "to", Username]]),
|
||||
|
||||
lager:info("checking that insecure input modfun works when whitelisted and"
|
||||
" has permissions"),
|
||||
@ -384,8 +388,8 @@ confirm() ->
|
||||
reduce_set_union}, undefined,
|
||||
true}])),
|
||||
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.mapreduce", "ON",
|
||||
"ANY", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.mapreduce", "on",
|
||||
"any", "from", Username]]),
|
||||
|
||||
lager:info("checking that insecure query modfuns works when whitelisted"),
|
||||
?assertMatch({ok, _},
|
||||
@ -399,8 +403,8 @@ confirm() ->
|
||||
|
||||
|
||||
lager:info("Revoking list-keys, checking that full-bucket mapred fails"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.list_keys", "ON",
|
||||
"default", "MR", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.list_keys", "on",
|
||||
"default", "MR", "from", Username]]),
|
||||
|
||||
?assertMatch({error, {"403", _}},
|
||||
rhc:mapred_bucket(C7, <<"MR">>, [{map, {jsfun,
|
||||
@ -412,10 +416,10 @@ confirm() ->
|
||||
crdt_tests(Nodes, C7),
|
||||
|
||||
URL = lists:flatten(io_lib:format("https://127.0.0.1:~b", [Port])),
|
||||
|
||||
|
||||
lager:info("checking link walking fails because it is deprecated"),
|
||||
|
||||
?assertMatch({ok, "403", _, <<"Link walking is deprecated", _/binary>>},
|
||||
?assertMatch({ok, "403", _, <<"Link walking is deprecated", _/binary>>},
|
||||
ibrowse:send_req(URL ++ "/riak/hb/first/_,_,_", [], get,
|
||||
[], [{response_format, binary}, {is_ssl, true},
|
||||
{ssl_options, [
|
||||
@ -455,6 +459,8 @@ mapred_modfun_input(Pipe, _Args, _Timeout) ->
|
||||
riak_pipe:eoi(Pipe).
|
||||
|
||||
crdt_tests([Node|_]=Nodes, RHC) ->
|
||||
Username = "user",
|
||||
|
||||
lager:info("Creating bucket types for CRDTs"),
|
||||
Types = [{<<"counters">>, counter, riakc_counter:to_op(riakc_counter:increment(5, riakc_counter:new()))},
|
||||
{<<"sets">>, set, riakc_set:to_op(riakc_set:add_element(<<"foo">>, riakc_set:new()))},
|
||||
@ -471,7 +477,7 @@ crdt_tests([Node|_]=Nodes, RHC) ->
|
||||
|
||||
lager:info("Granting CRDT riak_kv.get, checking that fetches succeed"),
|
||||
|
||||
[ grant(Node, ["riak_kv.get", "ON", binary_to_list(Type), "TO", "user"]) || {Type, _, _} <- Types ],
|
||||
[ grant(Node, ["riak_kv.get", "on", binary_to_list(Type), "to", Username]) || {Type, _, _} <- Types ],
|
||||
|
||||
[ ?assertEqual({error, {notfound, DType}},
|
||||
(rhc:fetch_type(RHC, {BType, <<"bucket">>}, <<"key">>))) ||
|
||||
@ -485,7 +491,7 @@ crdt_tests([Node|_]=Nodes, RHC) ->
|
||||
|
||||
lager:info("Granting CRDT riak_kv.put, checking that updates succeed"),
|
||||
|
||||
[ grant(Node, ["riak_kv.put", "ON", binary_to_list(Type), "TO", "user"]) || {Type, _, _} <- Types ],
|
||||
[ grant(Node, ["riak_kv.put", "on", binary_to_list(Type), "to", Username]) || {Type, _, _} <- Types ],
|
||||
|
||||
[?assertEqual(ok, (rhc:update_type(RHC, {BType, <<"bucket">>}, <<"key">>, Op)))
|
||||
|| {BType, _, Op} <- Types],
|
||||
|
@ -85,32 +85,36 @@ confirm() ->
|
||||
|
||||
riakc_pb_socket:stop(PB0),
|
||||
|
||||
%% Hindi in Devanagari : हिन्दी
|
||||
Username = [2361,2367,2344,2381,2342,2368],
|
||||
UsernameBin = unicode:characters_to_binary(Username, utf8, utf8),
|
||||
|
||||
lager:info("Checking SSL requires peer cert validation"),
|
||||
%% can't connect without specifying cacert to validate the server
|
||||
?assertMatch({error, _}, riakc_pb_socket:start("127.0.0.1", Port,
|
||||
[{credentials, "user",
|
||||
[{credentials, UsernameBin,
|
||||
"pass"}])),
|
||||
|
||||
lager:info("Checking that authentication is required"),
|
||||
%% invalid credentials should be invalid
|
||||
?assertEqual({error, {tcp, <<"Authentication failed">>}}, riakc_pb_socket:start("127.0.0.1", Port,
|
||||
[{credentials, "user",
|
||||
[{credentials, UsernameBin,
|
||||
"pass"}, {cacertfile,
|
||||
filename:join([CertDir, "rootCA/cert.pem"])}])),
|
||||
|
||||
lager:info("Creating user"),
|
||||
%% grant the user credentials
|
||||
ok = rpc:call(Node, riak_core_console, add_user, [["user", "password=password"]]),
|
||||
ok = rpc:call(Node, riak_core_console, add_user, [[Username, "password=password"]]),
|
||||
|
||||
lager:info("Setting trust mode on user"),
|
||||
%% trust 'user' on localhost
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [["user", "127.0.0.1/32",
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [[Username, "127.0.0.1/32",
|
||||
"trust"]]),
|
||||
|
||||
lager:info("Checking that credentials are ignored in trust mode"),
|
||||
%% invalid credentials should be ignored in trust mode
|
||||
{ok, PB1} = riakc_pb_socket:start("127.0.0.1", Port,
|
||||
[{credentials, "user",
|
||||
[{credentials, UsernameBin,
|
||||
"pass"}, {cacertfile,
|
||||
filename:join([CertDir, "rootCA/cert.pem"])}]),
|
||||
?assertEqual(pong, riakc_pb_socket:ping(PB1)),
|
||||
@ -118,20 +122,20 @@ confirm() ->
|
||||
|
||||
lager:info("Setting password mode on user"),
|
||||
%% require password on localhost
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [["user", "127.0.0.1/32",
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [[Username, "127.0.0.1/32",
|
||||
"password"]]),
|
||||
|
||||
lager:info("Checking that incorrect password fails auth"),
|
||||
%% invalid credentials should be invalid
|
||||
?assertEqual({error, {tcp, <<"Authentication failed">>}}, riakc_pb_socket:start("127.0.0.1", Port,
|
||||
[{credentials, "user",
|
||||
[{credentials, UsernameBin,
|
||||
"pass"}, {cacertfile,
|
||||
filename:join([CertDir, "rootCA/cert.pem"])}])),
|
||||
|
||||
lager:info("Checking that correct password is successful"),
|
||||
%% valid credentials should be valid
|
||||
{ok, PB2} = riakc_pb_socket:start("127.0.0.1", Port,
|
||||
[{credentials, "user",
|
||||
[{credentials, UsernameBin,
|
||||
"password"}, {cacertfile,
|
||||
filename:join([CertDir, "rootCA/cert.pem"])}]),
|
||||
?assertEqual(pong, riakc_pb_socket:ping(PB2)),
|
||||
@ -252,7 +256,7 @@ confirm() ->
|
||||
|
||||
%% time to actually do some stuff
|
||||
{ok, PB} = riakc_pb_socket:start("127.0.0.1", Port,
|
||||
[{credentials, "user", "password"},
|
||||
[{credentials, UsernameBin, "password"},
|
||||
{cacertfile,
|
||||
filename:join([CertDir, "rootCA/cert.pem"])}]),
|
||||
?assertEqual(pong, riakc_pb_socket:ping(PB)),
|
||||
@ -263,7 +267,7 @@ confirm() ->
|
||||
<<"world">>)),
|
||||
|
||||
lager:info("Granting riak_kv.get, checking get works but put doesn't"),
|
||||
grant(Node, ["riak_kv.get", "ON", "default", "hello", "TO", "user"]),
|
||||
grant(Node, ["riak_kv.get", "on", "default", "hello", "to", Username]),
|
||||
|
||||
?assertMatch({error, notfound}, riakc_pb_socket:get(PB, <<"hello">>,
|
||||
<<"world">>)),
|
||||
@ -274,7 +278,7 @@ confirm() ->
|
||||
<<"howareyou">>))),
|
||||
|
||||
lager:info("Granting riak_kv.put, checking put works and roundtrips with get"),
|
||||
grant(Node, ["riak_kv.put", "ON", "default", "hello", "TO", "user"]),
|
||||
grant(Node, ["riak_kv.put", "on", "default", "hello", "to", Username]),
|
||||
|
||||
?assertEqual(ok,
|
||||
riakc_pb_socket:put(PB,
|
||||
@ -286,9 +290,9 @@ confirm() ->
|
||||
|
||||
%% 1.4 counters
|
||||
%%
|
||||
grant(Node, ["riak_kv.put,riak_kv.get", "ON", "default", "counters", "TO", "user"]),
|
||||
%% ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put,riak_kv.get", "ON",
|
||||
%% "default", "counters", "TO", "user"]]),
|
||||
grant(Node, ["riak_kv.put,riak_kv.get", "on", "default", "counters", "to", Username]),
|
||||
%% ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put,riak_kv.get", "on",
|
||||
%% "default", "counters", "to", Username]]),
|
||||
|
||||
|
||||
lager:info("Checking that counters work on resources that have get/put permitted"),
|
||||
@ -303,7 +307,7 @@ confirm() ->
|
||||
lager:info("Revoking get, checking that counter_val fails"),
|
||||
%% revoke get
|
||||
ok = rpc:call(Node, riak_core_console, revoke,
|
||||
[["riak_kv.get", "ON", "default", "counters", "FROM", "user"]]),
|
||||
[["riak_kv.get", "on", "default", "counters", "from", Username]]),
|
||||
|
||||
?assertMatch({error, <<"Permission", _/binary>>},
|
||||
riakc_pb_socket:counter_val(PB, <<"counters">>,
|
||||
@ -314,7 +318,7 @@ confirm() ->
|
||||
lager:info("Revoking put, checking that counter_incr fails"),
|
||||
%% revoke put
|
||||
ok = rpc:call(Node, riak_core_console, revoke,
|
||||
[["riak_kv.put", "ON", "default", "counters", "FROM", "user"]]),
|
||||
[["riak_kv.put", "on", "default", "counters", "from", Username]]),
|
||||
|
||||
?assertMatch({error, <<"Permission", _/binary>>},
|
||||
riakc_pb_socket:counter_incr(PB, <<"counters">>,
|
||||
@ -322,8 +326,8 @@ confirm() ->
|
||||
|
||||
|
||||
lager:info("Revoking get/put, checking that get/put are disallowed"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.get,riak_kv.put", "ON",
|
||||
"default", "hello", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.get,riak_kv.put", "on",
|
||||
"default", "hello", "from", Username]]),
|
||||
|
||||
?assertMatch({error, <<"Permission", _/binary>>}, riakc_pb_socket:get(PB,
|
||||
<<"hello">>,
|
||||
@ -336,14 +340,14 @@ confirm() ->
|
||||
|
||||
%% try the 'any' grant
|
||||
lager:info("Granting get on ANY, checking user can fetch any bucket/key"),
|
||||
grant(Node, ["riak_kv.get", "ON", "ANY", "TO", "user"]),
|
||||
grant(Node, ["riak_kv.get", "on", "any", "to", Username]),
|
||||
|
||||
?assertMatch({ok, _Obj}, riakc_pb_socket:get(PB, <<"hello">>,
|
||||
<<"world">>)),
|
||||
|
||||
lager:info("Revoking ANY permission, checking fetch fails"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.get", "ON",
|
||||
"ANY", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.get", "on",
|
||||
"any", "from", Username]]),
|
||||
|
||||
?assertMatch({error, <<"Permission", _/binary>>}, riakc_pb_socket:get(PB,
|
||||
<<"hello">>,
|
||||
@ -354,8 +358,8 @@ confirm() ->
|
||||
?assertMatch({error, <<"Permission", _/binary>>}, riakc_pb_socket:list_keys(PB, <<"hello">>)),
|
||||
|
||||
lager:info("Granting riak_kv.list_keys, checking that list_keys succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "ON",
|
||||
"default", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "on",
|
||||
"default", "hello", "to", Username]]),
|
||||
|
||||
?assertMatch({ok, [<<"world">>]}, riakc_pb_socket:list_keys(PB, <<"hello">>)),
|
||||
|
||||
@ -364,8 +368,8 @@ confirm() ->
|
||||
riakc_pb_socket:list_buckets(PB)),
|
||||
|
||||
lager:info("Granting riak_kv.list_buckets, checking that list_buckets succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_buckets", "ON",
|
||||
"default", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_buckets", "on",
|
||||
"default", "to", Username]]),
|
||||
|
||||
{ok, BList} = riakc_pb_socket:list_buckets(PB),
|
||||
?assertEqual([<<"counters">>, <<"hello">>], lists:sort(BList)),
|
||||
@ -380,8 +384,8 @@ confirm() ->
|
||||
undefined, true}])),
|
||||
|
||||
lager:info("Granting mapreduce, checking that job succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "ON",
|
||||
"default", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "on",
|
||||
"default", "to", Username]]),
|
||||
|
||||
?assertEqual({ok, [{1, [1]}]},
|
||||
riakc_pb_socket:mapred_bucket(PB, <<"hello">>,
|
||||
@ -440,8 +444,8 @@ confirm() ->
|
||||
reduce_set_union},
|
||||
undefined, true}])),
|
||||
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "ON",
|
||||
"ANY", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.mapreduce", "on",
|
||||
"any", "to", Username]]),
|
||||
?assertEqual({ok, [{1, [<<"1">>]}]},
|
||||
riakc_pb_socket:mapred_bucket(PB, {modfun, ?MODULE,
|
||||
mapred_modfun_input, []},
|
||||
@ -452,8 +456,8 @@ confirm() ->
|
||||
reduce_set_union},
|
||||
undefined, true}])),
|
||||
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.mapreduce", "ON",
|
||||
"ANY", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.mapreduce", "on",
|
||||
"any", "from", Username]]),
|
||||
|
||||
lager:info("checking mapreduce with a insecure modfun phase works when"
|
||||
" whitelisted"),
|
||||
@ -474,8 +478,8 @@ confirm() ->
|
||||
|
||||
%% revoke only the list_keys permission
|
||||
lager:info("Revoking list-keys, checking that full-bucket mapred fails"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.list_keys", "ON",
|
||||
"default", "hello", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.list_keys", "on",
|
||||
"default", "hello", "from", Username]]),
|
||||
|
||||
?assertMatch({error, <<"Permission", _/binary>>},
|
||||
riakc_pb_socket:mapred_bucket(PB, <<"hello">>,
|
||||
@ -496,8 +500,8 @@ confirm() ->
|
||||
<<"John">>)),
|
||||
|
||||
lager:info("Granting 2i permissions, checking that results come back"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.index", "ON",
|
||||
"default", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.index", "on",
|
||||
"default", "to", Username]]),
|
||||
|
||||
%% don't actually have any indexes
|
||||
?assertMatch({ok, ?INDEX_RESULTS{keys=[]}},
|
||||
@ -514,8 +518,8 @@ confirm() ->
|
||||
riakc_pb_socket:get_bucket(PB, <<"mybucket">>)),
|
||||
|
||||
lager:info("Granting riak_core.get_bucket, checking that get_bucket succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.get_bucket", "ON",
|
||||
"default", "mybucket", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.get_bucket", "on",
|
||||
"default", "mybucket", "to", Username]]),
|
||||
|
||||
?assertEqual(3, proplists:get_value(n_val, element(2,
|
||||
riakc_pb_socket:get_bucket(PB,
|
||||
@ -526,8 +530,8 @@ confirm() ->
|
||||
riakc_pb_socket:set_bucket(PB, <<"mybucket">>, [{n_val, 5}])),
|
||||
|
||||
lager:info("Granting set_bucket, checking that set_bucket succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.set_bucket", "ON",
|
||||
"default", "mybucket", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_core.set_bucket", "on",
|
||||
"default", "mybucket", "to", Username]]),
|
||||
?assertEqual(ok,
|
||||
riakc_pb_socket:set_bucket(PB, <<"mybucket">>, [{n_val, 5}])),
|
||||
|
||||
@ -550,8 +554,8 @@ confirm() ->
|
||||
<<"world">>)),
|
||||
|
||||
lager:info("Granting get on the new bucket type, checking that it succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.get", "ON",
|
||||
"mytype", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.get", "on",
|
||||
"mytype", "hello", "to", Username]]),
|
||||
|
||||
?assertMatch({error, notfound}, riakc_pb_socket:get(PB, {<<"mytype">>,
|
||||
<<"hello">>},
|
||||
@ -570,8 +574,8 @@ confirm() ->
|
||||
<<"howareyou">>))),
|
||||
|
||||
lager:info("Granting put on a bucket in the new bucket type, checking that it succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put", "ON",
|
||||
"mytype", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.put", "on",
|
||||
"mytype", "hello", "to", Username]]),
|
||||
|
||||
?assertEqual(ok,
|
||||
riakc_pb_socket:put(PB,
|
||||
@ -589,8 +593,8 @@ confirm() ->
|
||||
<<"world">>)),
|
||||
|
||||
lager:info("Revoking get/put on the new bucket type, checking that they fail"),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.get,riak_kv.put", "ON",
|
||||
"mytype", "hello", "FROM", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, revoke, [["riak_kv.get,riak_kv.put", "on",
|
||||
"mytype", "hello", "from", Username]]),
|
||||
|
||||
?assertMatch({error, <<"Permission", _/binary>>}, riakc_pb_socket:get(PB,
|
||||
{<<"mytype">>,
|
||||
@ -607,8 +611,8 @@ confirm() ->
|
||||
riakc_pb_socket:list_keys(PB, {<<"mytype">>, <<"hello">>})),
|
||||
|
||||
lager:info("Granting list keys on a bucket in the new type, checking that it works"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "ON",
|
||||
"mytype", "hello", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_keys", "on",
|
||||
"mytype", "hello", "to", Username]]),
|
||||
|
||||
?assertEqual([<<"drnick">>, <<"world">>], lists:sort(element(2, riakc_pb_socket:list_keys(PB,
|
||||
{<<"mytype">>,
|
||||
@ -628,8 +632,8 @@ confirm() ->
|
||||
lager:info("Granting get/put on all buckets in the new type, checking that get/put works"),
|
||||
%% do a wildcard grant
|
||||
ok = rpc:call(Node, riak_core_console, grant,
|
||||
[["riak_kv.get,riak_kv.put", "ON",
|
||||
"mytype2", "TO", "user"]]),
|
||||
[["riak_kv.get,riak_kv.put", "on",
|
||||
"mytype2", "to", Username]]),
|
||||
|
||||
?assertMatch({error, notfound}, riakc_pb_socket:get(PB, {<<"mytype2">>,
|
||||
<<"hello">>},
|
||||
@ -654,8 +658,8 @@ confirm() ->
|
||||
riakc_pb_socket:list_buckets(PB, <<"mytype2">>)),
|
||||
|
||||
lager:info("Granting list buckets on the new type, checking that it succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_buckets", "ON",
|
||||
"mytype2", "TO", "user"]]),
|
||||
ok = rpc:call(Node, riak_core_console, grant, [["riak_kv.list_buckets", "on",
|
||||
"mytype2", "to", Username]]),
|
||||
|
||||
?assertMatch([<<"cromulent">>, <<"embiggen">>], lists:sort(element(2,
|
||||
riakc_pb_socket:list_buckets(PB,
|
||||
@ -673,7 +677,7 @@ confirm() ->
|
||||
|
||||
lager:info("Granting get on bucket type props, checking it succeeds and put still fails"),
|
||||
ok = rpc:call(Node, riak_core_console, grant,
|
||||
[["riak_core.get_bucket_type", "ON", "mytype2", "TO", "user"]]),
|
||||
[["riak_core.get_bucket_type", "on", "mytype2", "to", Username]]),
|
||||
|
||||
?assertEqual(3, proplists:get_value(n_val,
|
||||
element(2, riakc_pb_socket:get_bucket_type(PB,
|
||||
@ -684,7 +688,7 @@ confirm() ->
|
||||
|
||||
lager:info("Granting set on bucket type props, checking it succeeds"),
|
||||
ok = rpc:call(Node, riak_core_console, grant,
|
||||
[["riak_core.set_bucket_type", "ON", "mytype2", "TO", "user"]]),
|
||||
[["riak_core.set_bucket_type", "on", "mytype2", "to", Username]]),
|
||||
|
||||
riakc_pb_socket:set_bucket_type(PB, <<"mytype2">>, [{n_val, 5}]),
|
||||
|
||||
@ -717,8 +721,8 @@ group_test(Node, Port, CertDir) ->
|
||||
lager:info("Granting get/put/delete on a bucket type to the group, checking those requests work"),
|
||||
|
||||
%% do a wildcard grant
|
||||
grant(Node,["riak_kv.get,riak_kv.put,riak_kv.delete", "ON", "mytype2",
|
||||
"TO", "group"]),
|
||||
grant(Node,["riak_kv.get,riak_kv.put,riak_kv.delete", "on", "mytype2",
|
||||
"to", "group"]),
|
||||
|
||||
%% trust 'myuser' on localhost
|
||||
ok = rpc:call(Node, riak_core_console, add_source, [["myuser", "127.0.0.1/32",
|
||||
@ -757,6 +761,8 @@ grant(Node, Args) ->
|
||||
ok = rpc:call(Node, riak_core_console, grant, [Args]).
|
||||
|
||||
crdt_tests([Node|_]=Nodes, PB) ->
|
||||
Username = [2361,2367,2344,2381,2342,2368],
|
||||
|
||||
%% rt:create_and_activate
|
||||
lager:info("Creating bucket types for CRDTs"),
|
||||
Types = [{<<"counters">>, counter, riakc_counter:to_op(riakc_counter:increment(5, riakc_counter:new()))},
|
||||
@ -774,9 +780,9 @@ crdt_tests([Node|_]=Nodes, PB) ->
|
||||
|
||||
lager:info("Granting CRDT riak_kv.get, checking that fetches succeed"),
|
||||
|
||||
[ grant(Node, ["riak_kv.get", "ON", binary_to_list(Type), "TO", "user"]) || {Type, _, _} <- Types ],
|
||||
[ grant(Node, ["riak_kv.get", "on", binary_to_list(Type), "to", Username]) || {Type, _, _} <- Types ],
|
||||
|
||||
[ ?assertEqual({error, {notfound, DType}},
|
||||
[ ?assertEqual({error, {notfound, DType}},
|
||||
riakc_pb_socket:fetch_type(PB, {BType, <<"bucket">>}, <<"key">>)) ||
|
||||
{BType, DType, _} <- Types ],
|
||||
|
||||
@ -784,15 +790,15 @@ crdt_tests([Node|_]=Nodes, PB) ->
|
||||
|
||||
[ ?assertDenied(riakc_pb_socket:update_type(PB, {BType, <<"bucket">>}, <<"key">>, Op))
|
||||
|| {BType, _, Op} <- Types ],
|
||||
|
||||
|
||||
|
||||
lager:info("Granting CRDT riak_kv.put, checking that updates succeed"),
|
||||
|
||||
[ grant(Node, ["riak_kv.put", "ON", binary_to_list(Type), "TO", "user"]) || {Type, _, _} <- Types ],
|
||||
[ grant(Node, ["riak_kv.put", "on", binary_to_list(Type), "to", Username]) || {Type, _, _} <- Types ],
|
||||
|
||||
[ ?assertEqual(ok, riakc_pb_socket:update_type(PB, {BType, <<"bucket">>}, <<"key">>, Op))
|
||||
|| {BType, _, Op} <- Types ],
|
||||
|
||||
|
||||
ok.
|
||||
|
||||
map_object_value(RiakObject, A, B) ->
|
||||
|
@ -18,29 +18,24 @@
|
||||
|
||||
%% @doc riak_test entry point
|
||||
confirm() ->
|
||||
rt:set_conf(all, [{"buckets.default.siblings", "off"}]),
|
||||
rt:set_conf(all, [{"strong_consistency", "on"}]),
|
||||
|
||||
%% Start up two >1.3.2 clusters and connect them,
|
||||
{LeaderA, LeaderB, ANodes, BNodes} = make_clusters(),
|
||||
|
||||
%% Temporary kludge to handle startup race condition between
|
||||
%% riak_kv and riak_ensemble
|
||||
%% @TODO Remove this once riak_ensemble helpers are in place that
|
||||
%% provide a way for this race to be resolved.
|
||||
timer:sleep(60000),
|
||||
|
||||
PBA = get_pb_pid(LeaderA),
|
||||
PBB = get_pb_pid(LeaderB),
|
||||
|
||||
BucketType = <<"consistent_type">>,
|
||||
|
||||
%% Create consistent bucket type on cluster A
|
||||
rt:create_and_activate_bucket_type(LeaderA, BucketType, [{consistent, true}]),
|
||||
rt:create_and_activate_bucket_type(LeaderA,
|
||||
BucketType,
|
||||
[{consistent, true}, {n_val, 5}]),
|
||||
rt:wait_until_bucket_type_status(BucketType, active, ANodes),
|
||||
|
||||
%% Create consistent bucket type on cluster B
|
||||
rt:create_and_activate_bucket_type(LeaderB, BucketType, [{consistent, true}]),
|
||||
rt:create_and_activate_bucket_type(LeaderB,
|
||||
BucketType,
|
||||
[{consistent, true}, {n_val, 5}]),
|
||||
rt:wait_until_bucket_type_status(BucketType, active, BNodes),
|
||||
|
||||
connect_clusters(LeaderA, LeaderB),
|
||||
@ -88,27 +83,42 @@ connect_clusters(LeaderA, LeaderB) ->
|
||||
%% @doc Create two clusters of 1 node each and connect them for replication:
|
||||
%% Cluster "A" -> cluster "B"
|
||||
make_clusters() ->
|
||||
NumNodes = rt_config:get(num_nodes, 2),
|
||||
ClusterASize = rt_config:get(cluster_a_size, 1),
|
||||
NumNodes = rt_config:get(num_nodes, 6),
|
||||
ClusterASize = rt_config:get(cluster_a_size, 3),
|
||||
NVal = 5,
|
||||
|
||||
lager:info("Deploy ~p nodes", [NumNodes]),
|
||||
Conf = [
|
||||
{riak_repl,
|
||||
[
|
||||
%% turn off fullsync
|
||||
{fullsync_on_connect, false},
|
||||
{max_fssource_node, 2},
|
||||
{max_fssink_node, 2},
|
||||
{max_fssource_cluster, 5},
|
||||
{max_fssource_retries, 5}
|
||||
]}
|
||||
],
|
||||
Conf = ensemble_util:fast_config(NVal) ++
|
||||
[
|
||||
{riak_repl,
|
||||
[
|
||||
%% turn off fullsync
|
||||
{fullsync_on_connect, false},
|
||||
{max_fssource_node, 2},
|
||||
{max_fssink_node, 2},
|
||||
{max_fssource_cluster, 5},
|
||||
{max_fssource_retries, 5}
|
||||
]}
|
||||
],
|
||||
|
||||
Nodes = rt:deploy_nodes(NumNodes, Conf),
|
||||
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
|
||||
lager:info("ANodes: ~p", [ANodes]),
|
||||
lager:info("BNodes: ~p", [BNodes]),
|
||||
|
||||
AFirst = hd(ANodes),
|
||||
BFirst = hd(BNodes),
|
||||
|
||||
ok = rpc:call(AFirst, riak_ensemble_manager, enable, []),
|
||||
rpc:call(AFirst, riak_core_ring_manager, force_update, []),
|
||||
?assertEqual(true, rpc:call(AFirst, riak_ensemble_manager, enabled, [])),
|
||||
ensemble_util:wait_until_stable(AFirst, NVal),
|
||||
|
||||
ok = rpc:call(BFirst, riak_ensemble_manager, enable, []),
|
||||
rpc:call(BFirst, riak_core_ring_manager, force_update, []),
|
||||
?assertEqual(true, rpc:call(BFirst, riak_ensemble_manager, enabled, [])),
|
||||
ensemble_util:wait_until_stable(BFirst, NVal),
|
||||
|
||||
lager:info("Build cluster A"),
|
||||
repl_util:make_cluster(ANodes),
|
||||
|
||||
@ -118,12 +128,10 @@ make_clusters() ->
|
||||
%% get the leader for the first cluster
|
||||
lager:info("waiting for leader to converge on cluster A"),
|
||||
?assertEqual(ok, repl_util:wait_until_leader_converge(ANodes)),
|
||||
AFirst = hd(ANodes),
|
||||
|
||||
%% get the leader for the second cluster
|
||||
lager:info("waiting for leader to converge on cluster B"),
|
||||
?assertEqual(ok, repl_util:wait_until_leader_converge(BNodes)),
|
||||
BFirst = hd(BNodes),
|
||||
|
||||
%% Name the clusters
|
||||
repl_util:name_cluster(AFirst, "A"),
|
||||
@ -132,10 +140,20 @@ make_clusters() ->
|
||||
repl_util:name_cluster(BFirst, "B"),
|
||||
rt:wait_until_ring_converged(BNodes),
|
||||
|
||||
repl_util:wait_until_leader_converge(ANodes),
|
||||
repl_util:wait_until_leader_converge(BNodes),
|
||||
?assertEqual(true, rpc:call(AFirst, riak_ensemble_manager, enabled, [])),
|
||||
ensemble_util:wait_until_cluster(ANodes),
|
||||
ensemble_util:wait_for_membership(AFirst),
|
||||
ensemble_util:wait_until_stable(AFirst, NVal),
|
||||
|
||||
{AFirst, BFirst, ANodes, BNodes}.
|
||||
?assertEqual(true, rpc:call(BFirst, riak_ensemble_manager, enabled, [])),
|
||||
ensemble_util:wait_until_cluster(BNodes),
|
||||
ensemble_util:wait_for_membership(BFirst),
|
||||
ensemble_util:wait_until_stable(BFirst, NVal),
|
||||
|
||||
LeaderA = repl_util:get_leader(AFirst),
|
||||
LeaderB = repl_util:get_leader(BFirst),
|
||||
|
||||
{LeaderA, LeaderB, ANodes, BNodes}.
|
||||
|
||||
get_pb_pid(Leader) ->
|
||||
{ok, [{IP, PortA}] } = rpc:call(Leader, application, get_env, [riak_api, pb]),
|
||||
|
@ -13,10 +13,6 @@
|
||||
-export([make_bucket/3]).
|
||||
|
||||
confirm() ->
|
||||
NumNodes = rt_config:get(num_nodes, 6),
|
||||
ClusterASize = rt_config:get(cluster_a_size, 3),
|
||||
|
||||
lager:info("Deploy ~p nodes", [NumNodes]),
|
||||
Conf = [
|
||||
{riak_repl,
|
||||
[
|
||||
@ -25,21 +21,8 @@ confirm() ->
|
||||
{diff_batch_size, 10}
|
||||
]}
|
||||
],
|
||||
|
||||
Nodes = deploy_nodes(NumNodes, Conf),
|
||||
|
||||
{ANodes, BNodes} = lists:split(ClusterASize, Nodes),
|
||||
lager:info("ANodes: ~p", [ANodes]),
|
||||
lager:info("BNodes: ~p", [BNodes]),
|
||||
|
||||
lager:info("Build cluster A"),
|
||||
rt:log_to_nodes(Nodes, "Build cluster A"),
|
||||
repl_util:make_cluster(ANodes),
|
||||
|
||||
lager:info("Build cluster B"),
|
||||
rt:log_to_nodes(Nodes, "Build cluster B"),
|
||||
repl_util:make_cluster(BNodes),
|
||||
|
||||
rt:set_advanced_conf(all, Conf),
|
||||
[ANodes, BNodes] = rt:build_clusters([3, 3]),
|
||||
replication(ANodes, BNodes, false),
|
||||
pass.
|
||||
|
||||
@ -221,6 +204,8 @@ replication([AFirst|_] = ANodes, [BFirst|_] = BNodes, Connected) ->
|
||||
lager:info("Restarting down node ~p", [LeaderA]),
|
||||
rt:start(LeaderA),
|
||||
rt:wait_until_pingable(LeaderA),
|
||||
wait_until_no_pending_changes(ANodes),
|
||||
wait_until_leader_converge(ANodes),
|
||||
start_and_wait_until_fullsync_complete(LeaderA2),
|
||||
|
||||
case nodes_all_have_version(ANodes, "1.2.2") of
|
||||
@ -522,6 +507,9 @@ make_bucket([Node|_]=Nodes, Name, Args) ->
|
||||
?assertEqual(ok, Res).
|
||||
|
||||
start_and_wait_until_fullsync_complete(Node) ->
|
||||
start_and_wait_until_fullsync_complete(Node, 20).
|
||||
|
||||
start_and_wait_until_fullsync_complete(Node, Retries) ->
|
||||
Status0 = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||
Count = proplists:get_value(server_fullsyncs, Status0) + 1,
|
||||
lager:info("waiting for fullsync count to be ~p", [Count]),
|
||||
@ -529,36 +517,38 @@ start_and_wait_until_fullsync_complete(Node) ->
|
||||
lager:info("Starting fullsync on ~p (~p)", [Node,
|
||||
rtdev:node_version(rtdev:node_id(Node))]),
|
||||
rpc:call(Node, riak_repl_console, start_fullsync, [[]]),
|
||||
|
||||
%% sleep because of the old bug where stats will crash if you call it too
|
||||
%% soon after starting a fullsync
|
||||
timer:sleep(500),
|
||||
|
||||
Res = rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||
case proplists:get_value(server_fullsyncs, Status) of
|
||||
C when C >= Count ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end),
|
||||
case node_has_version(Node, "1.2.0") of
|
||||
true ->
|
||||
?assertEqual(ok, Res);
|
||||
_ ->
|
||||
case Res of
|
||||
ok ->
|
||||
ok;
|
||||
_ ->
|
||||
?assertEqual(ok, wait_until_connection(Node)),
|
||||
lager:warning("Pre 1.2.0 node failed to fullsync, retrying"),
|
||||
start_and_wait_until_fullsync_complete(Node)
|
||||
end
|
||||
case rt:wait_until(make_fullsync_wait_fun(Node, Count), 100, 1000) of
|
||||
ok ->
|
||||
ok;
|
||||
_ when Retries > 0 ->
|
||||
?assertEqual(ok, wait_until_connection(Node)),
|
||||
lager:warning("Node failed to fullsync, retrying"),
|
||||
rpc:call(Node, riak_repl_console, cancel_fullsync, [[]]),
|
||||
start_and_wait_until_fullsync_complete(Node, Retries-1)
|
||||
end,
|
||||
|
||||
lager:info("Fullsync on ~p complete", [Node]).
|
||||
|
||||
make_fullsync_wait_fun(Node, Count) ->
|
||||
fun() ->
|
||||
Status = rpc:call(Node, riak_repl_console, status, [quiet]),
|
||||
case Status of
|
||||
{badrpc, _} ->
|
||||
false;
|
||||
_ ->
|
||||
case proplists:get_value(server_fullsyncs, Status) of
|
||||
C when C >= Count ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
wait_until_is_leader(Node) ->
|
||||
lager:info("wait_until_is_leader(~p)", [Node]),
|
||||
rt:wait_until(Node, fun is_leader/1).
|
||||
@ -613,23 +603,39 @@ wait_until_new_leader(Node, OldLeader) ->
|
||||
wait_until_leader_converge([Node|_] = Nodes) ->
|
||||
rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
length(lists:usort([begin
|
||||
case rpc:call(N, riak_repl_console, status, [quiet]) of
|
||||
{badrpc, _} ->
|
||||
false;
|
||||
Status ->
|
||||
case proplists:get_value(leader, Status) of
|
||||
undefined ->
|
||||
false;
|
||||
L ->
|
||||
%lager:info("Leader for ~p is ~p",
|
||||
%[N,L]),
|
||||
L
|
||||
end
|
||||
end
|
||||
end || N <- Nodes])) == 1
|
||||
LeaderResults =
|
||||
[get_leader(rpc:call(N, riak_repl_console, status, [quiet])) ||
|
||||
N <- Nodes],
|
||||
{Leaders, Errors} =
|
||||
lists:partition(leader_result_filter_fun(), LeaderResults),
|
||||
UniqueLeaders = lists:usort(Leaders),
|
||||
Errors == [] andalso length(UniqueLeaders) == 1
|
||||
end).
|
||||
|
||||
get_leader({badrpc, _}=Err) ->
|
||||
Err;
|
||||
get_leader(Status) ->
|
||||
case proplists:get_value(leader, Status) of
|
||||
undefined ->
|
||||
false;
|
||||
L ->
|
||||
%%lager:info("Leader for ~p is ~p",
|
||||
%%[N,L]),
|
||||
L
|
||||
end.
|
||||
|
||||
leader_result_filter_fun() ->
|
||||
fun(L) ->
|
||||
case L of
|
||||
undefined ->
|
||||
false;
|
||||
{badrpc, _} ->
|
||||
false;
|
||||
_ ->
|
||||
true
|
||||
end
|
||||
end.
|
||||
|
||||
wait_until_connection(Node) ->
|
||||
rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
@ -695,4 +701,3 @@ do_write(Node, Start, End, Bucket, W) ->
|
||||
lists:flatten([rt:systest_write(Node, S, S, Bucket, W) ||
|
||||
{S, _Error} <- Errors])
|
||||
end.
|
||||
|
||||
|
@ -63,14 +63,13 @@ setup_repl_clusters(Conf, SSL) ->
|
||||
],
|
||||
|
||||
|
||||
rt:set_advanced_conf(all, Conf),
|
||||
Nodes = [ANodes, BNodes, CNodes] = rt:build_clusters([2, 2, 2]),
|
||||
|
||||
Nodes = deploy_nodes(NumNodes, Conf),
|
||||
AFirst = hd(ANodes),
|
||||
BFirst = hd(BNodes),
|
||||
CFirst = hd(CNodes),
|
||||
|
||||
lager:info("Nodes = ~p", [Nodes]),
|
||||
{[AFirst|_] = ANodes, Rest} = lists:split(2, Nodes),
|
||||
{[BFirst|_] = BNodes, [CFirst|_] = CNodes} = lists:split(2, Rest),
|
||||
|
||||
%%AllNodes = ANodes ++ BNodes ++ CNodes,
|
||||
rt:log_to_nodes(Nodes, "Starting replication2_pg test"),
|
||||
|
||||
lager:info("ANodes: ~p", [ANodes]),
|
||||
@ -92,15 +91,6 @@ setup_repl_clusters(Conf, SSL) ->
|
||||
|
||||
rt:log_to_nodes(Nodes, "Building and connecting clusters"),
|
||||
|
||||
lager:info("Build cluster A"),
|
||||
repl_util:make_cluster(ANodes),
|
||||
|
||||
lager:info("Build cluster B"),
|
||||
repl_util:make_cluster(BNodes),
|
||||
|
||||
lager:info("Build cluster C"),
|
||||
repl_util:make_cluster(CNodes),
|
||||
|
||||
repl_util:name_cluster(AFirst, "A"),
|
||||
repl_util:name_cluster(BFirst, "B"),
|
||||
repl_util:name_cluster(CFirst, "C"),
|
||||
@ -112,19 +102,20 @@ setup_repl_clusters(Conf, SSL) ->
|
||||
repl_util:wait_until_leader(AFirst),
|
||||
LeaderA = rpc:call(AFirst, riak_core_cluster_mgr, get_leader, []),
|
||||
|
||||
{ok, {_IP, BPort}} = rpc:call(BFirst, application, get_env,
|
||||
{ok, {BIP, BPort}} = rpc:call(BFirst, application, get_env,
|
||||
[riak_core, cluster_mgr]),
|
||||
repl_util:connect_cluster(LeaderA, "127.0.0.1", BPort),
|
||||
repl_util:connect_cluster(LeaderA, BIP, BPort),
|
||||
|
||||
{ok, {_IP, CPort}} = rpc:call(CFirst, application, get_env,
|
||||
{ok, {CIP, CPort}} = rpc:call(CFirst, application, get_env,
|
||||
[riak_core, cluster_mgr]),
|
||||
repl_util:connect_cluster(LeaderA, "127.0.0.1", CPort),
|
||||
repl_util:connect_cluster(LeaderA, CIP, CPort),
|
||||
|
||||
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
|
||||
rt:wait_until_ring_converged(ANodes),
|
||||
|
||||
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "C")),
|
||||
rt:wait_until_ring_converged(ANodes),
|
||||
|
||||
{LeaderA, ANodes, BNodes, CNodes, Nodes}.
|
||||
|
||||
|
||||
@ -149,7 +140,7 @@ test_basic_pg(Mode, SSL) ->
|
||||
{fullsync_on_connect, false}
|
||||
]}
|
||||
],
|
||||
{LeaderA, ANodes, BNodes, _CNodes, AllNodes} =
|
||||
{LeaderA, ANodes, BNodes, CNodes, AllNodes} =
|
||||
setup_repl_clusters(Conf, SSL),
|
||||
rt:log_to_nodes(AllNodes, "Testing basic pg"),
|
||||
|
||||
@ -182,7 +173,9 @@ test_basic_pg(Mode, SSL) ->
|
||||
rt:pbc_write(PidA, Bucket, KeyA, ValueA),
|
||||
rt:pbc_write(PidA, Bucket, KeyB, ValueB),
|
||||
|
||||
{_FirstA, FirstB, FirstC} = get_firsts(AllNodes),
|
||||
_FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
FirstC = hd(CNodes),
|
||||
PidB = rt:pbc(FirstB),
|
||||
lager:info("Connected to cluster B"),
|
||||
{ok, PGResult} = riak_repl_pb_api:get(PidB,Bucket,KeyA,CidA),
|
||||
@ -284,7 +277,9 @@ test_12_pg(Mode, SSL) ->
|
||||
{Bucket, KeyB, ValueB} = make_test_object("b"),
|
||||
|
||||
rt:log_to_nodes(AllNodes, "Test 1.2 proxy_get"),
|
||||
{_FirstA, FirstB, _FirstC} = get_firsts(AllNodes),
|
||||
_FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
_FirstC = hd(CNodes),
|
||||
case Mode of
|
||||
mode_repl12 ->
|
||||
ModeRes = rpc:call(FirstB, riak_repl_console, modes, [["mode_repl12"]]),
|
||||
@ -369,7 +364,7 @@ test_pg_proxy(SSL) ->
|
||||
{fullsync_on_connect, false}
|
||||
]}
|
||||
],
|
||||
{LeaderA, ANodes, BNodes, _CNodes, AllNodes} =
|
||||
{LeaderA, ANodes, BNodes, CNodes, AllNodes} =
|
||||
setup_repl_clusters(Conf, SSL),
|
||||
rt:log_to_nodes(AllNodes, "Testing pg proxy"),
|
||||
rt:wait_until_ring_converged(ANodes),
|
||||
@ -399,12 +394,17 @@ test_pg_proxy(SSL) ->
|
||||
rt:pbc_write(PidA, Bucket, KeyD, ValueD),
|
||||
%% sanity check. You know, like the 10000 tests that autoconf runs
|
||||
%% before it actually does any work.
|
||||
{FirstA, FirstB, _FirstC} = get_firsts(AllNodes),
|
||||
FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
_FirstC = hd(CNodes),
|
||||
PidB = rt:pbc(FirstB),
|
||||
lager:info("Connected to cluster B"),
|
||||
{ok, PGResult} = riak_repl_pb_api:get(PidB,Bucket,KeyA,CidA),
|
||||
?assertEqual(ValueA, riakc_obj:get_value(PGResult)),
|
||||
|
||||
rt:wait_until_transfers_complete(ANodes),
|
||||
rt:wait_until_transfers_complete(BNodes),
|
||||
|
||||
lager:info("Stopping leader on requester cluster"),
|
||||
PGLeaderB = rpc:call(FirstB, riak_core_cluster_mgr, get_leader, []),
|
||||
rt:log_to_nodes(AllNodes, "Killing leader on requester cluster"),
|
||||
@ -439,10 +439,12 @@ test_cluster_mapping(SSL) ->
|
||||
{fullsync_on_connect, false}
|
||||
]}
|
||||
],
|
||||
{LeaderA, ANodes, BNodes, CNodes, AllNodes} =
|
||||
{LeaderA, ANodes, BNodes, CNodes, _AllNodes} =
|
||||
setup_repl_clusters(Conf, SSL),
|
||||
|
||||
{_FirstA, FirstB, FirstC} = get_firsts(AllNodes),
|
||||
_FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
FirstC = hd(CNodes),
|
||||
LeaderB = rpc:call(FirstB, riak_core_cluster_mgr, get_leader, []),
|
||||
LeaderC = rpc:call(FirstC, riak_core_cluster_mgr, get_leader, []),
|
||||
|
||||
@ -554,14 +556,16 @@ test_bidirectional_pg(SSL) ->
|
||||
{fullsync_on_connect, false}
|
||||
]}
|
||||
],
|
||||
{LeaderA, ANodes, BNodes, _CNodes, AllNodes} =
|
||||
{LeaderA, ANodes, BNodes, CNodes, AllNodes} =
|
||||
setup_repl_clusters(Conf, SSL),
|
||||
rt:log_to_nodes(AllNodes, "Testing bidirectional proxy-get"),
|
||||
|
||||
rt:wait_until_ring_converged(ANodes),
|
||||
rt:wait_until_ring_converged(BNodes),
|
||||
|
||||
{FirstA, FirstB, _FirstC} = get_firsts(AllNodes),
|
||||
FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
_FirstC = hd(CNodes),
|
||||
|
||||
LeaderB = rpc:call(FirstB, riak_repl2_leader, leader_node, []),
|
||||
|
||||
@ -662,7 +666,9 @@ test_multiple_sink_pg(SSL) ->
|
||||
rt:pbc_write(PidA, Bucket, KeyA, ValueA),
|
||||
rt:pbc_write(PidA, Bucket, KeyB, ValueB),
|
||||
|
||||
{_FirstA, FirstB, FirstC} = get_firsts(AllNodes),
|
||||
_FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
FirstC = hd(CNodes),
|
||||
|
||||
PidB = rt:pbc(FirstB),
|
||||
PidC = rt:pbc(FirstC),
|
||||
@ -713,7 +719,9 @@ test_mixed_pg(SSL) ->
|
||||
rt:pbc_write(PidA, Bucket, KeyB, ValueB),
|
||||
rt:pbc_write(PidA, Bucket, KeyC, ValueC),
|
||||
|
||||
{_FirstA, FirstB, FirstC} = get_firsts(AllNodes),
|
||||
_FirstA = hd(ANodes),
|
||||
FirstB = hd(BNodes),
|
||||
FirstC = hd(CNodes),
|
||||
rt:wait_until_ring_converged(ANodes),
|
||||
rt:wait_until_ring_converged(BNodes),
|
||||
|
||||
@ -861,8 +869,7 @@ confirm() ->
|
||||
],
|
||||
lager:error("run riak_test with -t Mod:test1 -t Mod:test2"),
|
||||
lager:error("The runnable tests in this module are: ~p", [AllTests]),
|
||||
?assert(false).
|
||||
|
||||
[?assertEqual(pass, erlang:apply(?MODULE, Test, [])) || Test <- AllTests].
|
||||
|
||||
banner(T) ->
|
||||
banner(T, false).
|
||||
@ -874,13 +881,6 @@ banner(T, SSL) ->
|
||||
lager:info("----------------------------------------------"),
|
||||
lager:info("----------------------------------------------").
|
||||
|
||||
|
||||
get_firsts(Nodes) ->
|
||||
{[AFirst|_] = _ANodes, Rest} = lists:split(2, Nodes),
|
||||
{[BFirst|_] = _BNodes, [CFirst|_] = _CNodes} = lists:split(2, Rest),
|
||||
{AFirst, BFirst, CFirst}.
|
||||
|
||||
|
||||
wait_until_pg(Node, Pid, Bucket, Key, Cid) ->
|
||||
rt:wait_until(Node,
|
||||
fun(_) ->
|
||||
|
@ -19,7 +19,8 @@
|
||||
%% -- Puts the new value back to riak
|
||||
%% This results in 99 siblings, each a subset of the following sibling [0] | [0, 1] | [0, 1, 2], [0, 1, 2, 3] etc
|
||||
confirm() ->
|
||||
Conf = [{riak_core, [{default_bucket_props, [{allow_mult, true}]}]}],
|
||||
Conf = [{riak_core, [{default_bucket_props, [{allow_mult, true},
|
||||
{dvv_enabled, true}]}]}],
|
||||
[Node1] = rt:deploy_nodes(1, Conf),
|
||||
N = 100,
|
||||
|
||||
|
@ -57,7 +57,10 @@ confirm() ->
|
||||
|
||||
make_clusters() ->
|
||||
Conf = [{riak_repl, [{fullsync_on_connect, false},
|
||||
{fullsync_interval, disabled}]}],
|
||||
{fullsync_interval, disabled}]},
|
||||
{riak_core, [{default_bucket_props,
|
||||
[{dvv_enabled, true},
|
||||
{allow_mult, true}]}]}],
|
||||
Nodes = rt:deploy_nodes(6, Conf),
|
||||
{ClusterA, ClusterB} = lists:split(3, Nodes),
|
||||
A = make_cluster(ClusterA, "A"),
|
||||
|
@ -34,9 +34,11 @@ confirm() ->
|
||||
rt:set_conf(all, [{"buckets.default.allow_mult", "false"}]),
|
||||
rt:update_app_config(all, [{riak_core,
|
||||
[{ring_creation_size, ?START_SIZE}]}]),
|
||||
[ANode, AnotherNode, YetAnother, ReplacingNode] = AllNodes = rt:deploy_nodes(4),
|
||||
Nodes = [ANode, AnotherNode, YetAnother],
|
||||
NewNodes = [ANode, YetAnother, ReplacingNode],
|
||||
[ANode, AnotherNode, YetAnother, _ReplacingNode] = _AllNodes = rt:deploy_nodes(4),
|
||||
NewNodes = Nodes = [ANode, AnotherNode, YetAnother],
|
||||
%% This assignment for `NewNodes' is commented until riak_core
|
||||
%% issue #570 is resolved
|
||||
%% NewNodes = [ANode, YetAnother, ReplacingNode],
|
||||
rt:join(AnotherNode, ANode),
|
||||
rt:join(YetAnother, ANode),
|
||||
rt:wait_until_nodes_agree_about_ownership(Nodes),
|
||||
@ -55,23 +57,33 @@ confirm() ->
|
||||
lager:info("verifying previously written data"),
|
||||
?assertEqual([], rt:systest_read(ANode, 1, 500, ?BUCKET, ?R)),
|
||||
|
||||
lager:info("testing force-replace during resize"),
|
||||
submit_resize(?EXPANDED_SIZE, ANode),
|
||||
%% sleep for a second, yes i know this is nasty but we just care that the resize has
|
||||
%% been submitted and started, we aren't really waiting on a condition
|
||||
timer:sleep(3000),
|
||||
rpc:multicall(Nodes, riak_core_handoff_manager, kill_handoffs, []),
|
||||
ok = rpc:call(ReplacingNode, riak_core, staged_join, [ANode]),
|
||||
rt:wait_until_ring_converged(AllNodes),
|
||||
ok = rpc:call(ANode, riak_core_claimant, force_replace, [AnotherNode, ReplacingNode]),
|
||||
{ok, _, _} = rpc:call(ANode, riak_core_claimant, plan, []),
|
||||
ok = rpc:call(ANode, riak_core_claimant, commit, []),
|
||||
rpc:multicall(AllNodes, riak_core_handoff_manager, set_concurrency, [4]),
|
||||
rt:wait_until_no_pending_changes(NewNodes),
|
||||
assert_ring_size(?EXPANDED_SIZE, NewNodes),
|
||||
lager:info("verifying written data"),
|
||||
test_resize(?START_SIZE, ?EXPANDED_SIZE, ANode, Nodes),
|
||||
lager:info("verifying previously written data"),
|
||||
?assertEqual([], rt:systest_read(ANode, 1, 750, ?BUCKET, ?R)),
|
||||
|
||||
%% This following test code for force-replace is commented until
|
||||
%% riak_core issue #570 is resolved. At that time the preceding 3
|
||||
%% lines should also be removed
|
||||
|
||||
%% lager:info("testing force-replace during resize"),
|
||||
%% submit_resize(?EXPANDED_SIZE, ANode),
|
||||
%% %% sleep for a second, yes i know this is nasty but we just care that the resize has
|
||||
%% %% been submitted and started, we aren't really waiting on a condition
|
||||
%% timer:sleep(3000),
|
||||
%% rpc:multicall(Nodes, riak_core_handoff_manager, kill_handoffs, []),
|
||||
%% Statuses = rpc:multicall(Nodes, riak_core_handoff_manager, status, []),
|
||||
%% lager:info("Handoff statuses: ~p", [Statuses]),
|
||||
%% ok = rpc:call(ReplacingNode, riak_core, staged_join, [ANode]),
|
||||
%% rt:wait_until_ring_converged(AllNodes),
|
||||
%% ok = rpc:call(ANode, riak_core_claimant, force_replace, [AnotherNode, ReplacingNode]),
|
||||
%% {ok, _, _} = rpc:call(ANode, riak_core_claimant, plan, []),
|
||||
%% ok = rpc:call(ANode, riak_core_claimant, commit, []),
|
||||
%% rpc:multicall(AllNodes, riak_core_handoff_manager, set_concurrency, [4]),
|
||||
%% rt:wait_until_no_pending_changes(NewNodes),
|
||||
%% assert_ring_size(?EXPANDED_SIZE, NewNodes),
|
||||
%% lager:info("verifying written data"),
|
||||
%% ?assertEqual([], rt:systest_read(ANode, 1, 750, ?BUCKET, ?R)),
|
||||
|
||||
test_resize(?EXPANDED_SIZE, ?SHRUNK_SIZE, ANode, NewNodes),
|
||||
lager:info("verifying written data"),
|
||||
?assertEqual([], rt:systest_read(ANode, 1, 750, ?BUCKET, ?R)),
|
||||
@ -124,19 +136,19 @@ write_during_resize(_, Start, End) when Start =:= undefined orelse End =:= undef
|
||||
ok;
|
||||
write_during_resize(Node, Start, End) ->
|
||||
Pid = self(),
|
||||
spawn(fun() ->
|
||||
spawn(fun() ->
|
||||
case rt:systest_write(Node, Start, End, ?BUCKET, ?W) of
|
||||
[] ->
|
||||
Pid ! done_writing;
|
||||
Ers ->
|
||||
Pid ! {errors_writing, Ers}
|
||||
end
|
||||
end
|
||||
end).
|
||||
|
||||
verify_write_during_resize(_, Start, End) when Start =:= undefined orelse End =:= undefined ->
|
||||
ok;
|
||||
verify_write_during_resize(Node, Start, End) ->
|
||||
receive
|
||||
receive
|
||||
done_writing ->
|
||||
lager:info("verifying data written during operation"),
|
||||
?assertEqual([], rt:systest_read(Node, Start, End, ?BUCKET, ?R)),
|
||||
@ -144,7 +156,7 @@ verify_write_during_resize(Node, Start, End) ->
|
||||
{errors_writing, Ers} ->
|
||||
lager:error("errors were encountered while writing during operation: ~p", [Ers]),
|
||||
throw(writes_failed)
|
||||
after
|
||||
after
|
||||
10000 ->
|
||||
lager:error("failed to complete writes during operation before timeout"),
|
||||
throw(writes_timedout)
|
||||
@ -169,7 +181,7 @@ assert_ring_size(Size, Node) ->
|
||||
wait_until_extra_vnodes_shutdown([]) ->
|
||||
ok;
|
||||
wait_until_extra_vnodes_shutdown([Node | Nodes]) ->
|
||||
wait_until_extra_vnodes_shutdown(Node),
|
||||
wait_until_extra_vnodes_shutdown(Node),
|
||||
wait_until_extra_vnodes_shutdown(Nodes);
|
||||
wait_until_extra_vnodes_shutdown(Node) ->
|
||||
{ok, R} = rpc:call(Node, riak_core_ring_manager, get_my_ring, []),
|
||||
@ -194,13 +206,8 @@ wait_until_extra_proxies_shutdown(Node) ->
|
||||
StillRunning = [Idx || Idx <- Running, not lists:member(Idx, AllIndexes)],
|
||||
length(StillRunning) =:= 0
|
||||
end,
|
||||
?assertEqual(ok, rt:wait_until(Node, F)).
|
||||
?assertEqual(ok, rt:wait_until(Node, F)).
|
||||
|
||||
running_vnode_proxies(Node) ->
|
||||
Children = rpc:call(Node, supervisor, which_children, [riak_core_vnode_proxy_sup]),
|
||||
[Idx || {{_,Idx},Pid,_,_} <- Children, is_pid(Pid)].
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
80
tests/vnode_util.erl
Normal file
80
tests/vnode_util.erl
Normal file
@ -0,0 +1,80 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2013-2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(vnode_util).
|
||||
-compile(export_all).
|
||||
|
||||
load(Nodes) ->
|
||||
rt:load_modules_on_nodes([?MODULE], Nodes),
|
||||
ok.
|
||||
|
||||
suspend_vnode(Node, Idx) ->
|
||||
lager:info("Suspending vnode ~p/~p", [Node, Idx]),
|
||||
Pid = rpc:call(Node, ?MODULE, remote_suspend_vnode, [Idx], infinity),
|
||||
Pid.
|
||||
|
||||
remote_suspend_vnode(Idx) ->
|
||||
Parent = self(),
|
||||
Pid = spawn(fun() ->
|
||||
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
|
||||
erlang:suspend_process(Pid, []),
|
||||
Parent ! suspended,
|
||||
receive resume ->
|
||||
io:format("Resuming vnode :: ~p/~p~n", [node(), Idx]),
|
||||
erlang:resume_process(Pid)
|
||||
end
|
||||
end),
|
||||
receive suspended -> ok end,
|
||||
Pid.
|
||||
|
||||
resume_vnode(Pid) ->
|
||||
Pid ! resume.
|
||||
|
||||
kill_vnode({VIdx, VNode}) ->
|
||||
lager:info("Killing vnode: ~p", [VIdx]),
|
||||
Pid = vnode_pid(VNode, VIdx),
|
||||
rpc:call(VNode, erlang, exit, [Pid, kill]),
|
||||
ok = rt:wait_until(fun() ->
|
||||
vnode_pid(VNode, VIdx) /= Pid
|
||||
end).
|
||||
|
||||
vnode_pid(Node, Partition) ->
|
||||
{ok, Pid} = rpc:call(Node, riak_core_vnode_manager, get_vnode_pid,
|
||||
[Partition, riak_kv_vnode]),
|
||||
Pid.
|
||||
|
||||
rebuild_vnode({VIdx, VNode}) ->
|
||||
lager:info("Rebuild AAE tree: ~p", [VIdx]),
|
||||
rebuild_aae_tree(VNode, VIdx).
|
||||
|
||||
rebuild_aae_tree(Node, Partition) ->
|
||||
{ok, Pid} = rpc:call(Node, riak_kv_vnode, hashtree_pid, [Partition]),
|
||||
Info = rpc:call(Node, riak_kv_entropy_info, compute_tree_info, []),
|
||||
{_, Built} = lists:keyfind(Partition, 1, Info),
|
||||
lager:info("Forcing rebuild of AAE tree for: ~b", [Partition]),
|
||||
lager:info("Tree originally built at: ~p", [Built]),
|
||||
rpc:call(Node, riak_kv_index_hashtree, clear, [Pid]),
|
||||
ok = rt:wait_until(fun() ->
|
||||
NewInfo = rpc:call(Node, riak_kv_entropy_info, compute_tree_info, []),
|
||||
{_, NewBuilt} = lists:keyfind(Partition, 1, NewInfo),
|
||||
NewBuilt > Built
|
||||
end),
|
||||
lager:info("Tree successfully rebuilt"),
|
||||
ok.
|
Loading…
Reference in New Issue
Block a user