Merge branch 'develop' of github.com:basho/riak_test into retire-r_o_r

This commit is contained in:
John R. Daily 2016-12-14 11:29:54 -05:00
commit aa4782f8e3
45 changed files with 2779 additions and 188 deletions

View File

@ -1,3 +1,5 @@
HEAD_REVISION ?= $(shell git describe --tags --exact-match HEAD 2>/dev/null)
.PHONY: deps
APPS = kernel stdlib sasl erts ssl tools os_mon runtime_tools crypto inets \
@ -10,6 +12,7 @@ all: deps compile
SMOKE_TEST=1 ./rebar skip_deps=true escriptize
deps:
$(if $(HEAD_REVISION),$(warning "Warning: you have checked out a tag ($(HEAD_REVISION)) and should use the locked-deps target"))
./rebar get-deps
docsclean:
@ -28,6 +31,19 @@ quickbuild:
./rebar skip_deps=true compile
./rebar escriptize
##
## Lock Targets
##
## see https://github.com/seth/rebar_lock_deps_plugin
lock: deps compile
./rebar lock-deps
locked-all: locked-deps compile
locked-deps:
@echo "Using rebar.config.lock file to fetch dependencies"
./rebar -C rebar.config.lock get-deps
##################
# Dialyzer targets
##################

View File

@ -190,4 +190,5 @@ build "riak-1.4.12" $R15B01 1.4.12 false
build "riak-2.0.2" $R16B02 2.0.2
build "riak-2.0.4" $R16B02 2.0.4
build "riak-2.0.6" $R16B02 2.0.6
build "riak-2.0.7" $R16B02 2.0.7
echo

View File

@ -39,8 +39,11 @@ cd $RT_DEST_DIR
git init
## Some versions of git and/or OS require these fields
git config user.name "Riak Test"
git config user.email "dev@basho.com"
git config --local user.name "Riak Test"
git config --local user.email "dev@basho.com"
git config --local core.autocrlf input
git config --local core.safecrlf false
git config --local core.filemode true
git add --all --force .
git commit -a -m "riak_test init" > /dev/null

View File

@ -23,9 +23,9 @@
-include("intercept.hrl").
-define(M, riak_kv_get_fsm_orig).
count_start_link_4(From, Bucket, Key, GetOptions) ->
?I_INFO("sending startlink/4 through proxy"),
case ?M:start_link_orig(From, Bucket, Key, GetOptions) of
count_start_4(From, Bucket, Key, GetOptions) ->
?I_INFO("sending start/4 through proxy"),
case ?M:start_orig(From, Bucket, Key, GetOptions) of
{error, overload} ->
?I_INFO("riak_kv_get_fsm not started due to overload.");
{ok, _} ->

View File

@ -32,11 +32,17 @@ delayed_compare(_IndexN, _Remote, _AccFun, _TreePid) ->
%% @doc When attempting to get the lock on a hashtree, return the
%% not_built atom which means the tree has not been computed yet.
not_built(_TreePid, _Type) ->
not_built(_TreePid, _Type, _Version, _Pid) ->
not_built.
%% @doc When attempting to get the lock on a hashtree, return the
%% already_locked atom which means the tree is locked by another
%% process.
already_locked(_TreePid, _Type) ->
already_locked(_TreePid, _Type, _Version, _Pid) ->
already_locked.
%% @doc When attempting to get the lock on a hashtree, return the
%% bad_version atom which means the local tree does not match
%% the requested version
bad_version(_TreePid, _Type, _Version, _Pid) ->
bad_version.

View File

@ -0,0 +1,48 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%%-------------------------------------------------------------------
-module(yz_solrq_helper_intercepts).
-compile(export_all).
-include("intercept.hrl").
-define(M, yz_solrq_helper_orig).
handle_get_ops_for_no_sibling_deletes(LI, P, Obj) ->
Lookup = ets:lookup(intercepts_tab, del_put),
case Lookup of
[] -> original_get_ops_for_no_sibling_deletes(LI, P, Obj);
_ ->
case proplists:get_value(del_put, Lookup) of
0 ->
error_logger:info_msg(
"Delete operation intercepted for BKey ~p",
[{riak_object:bucket(Obj), riak_object:key(Obj)}]),
ets:update_counter(intercepts_tab, del_put, 1),
[];
_ ->
original_get_ops_for_no_sibling_deletes(LI, P, Obj)
end
end.
original_get_ops_for_no_sibling_deletes(LI, P, Obj) ->
error_logger:info_msg(
"Delete operation original for BKey ~p",
[{riak_object:bucket(Obj), riak_object:key(Obj)}]),
?M:get_ops_for_no_sibling_deletes_orig(LI, P, Obj).

BIN
rebar vendored

Binary file not shown.

View File

@ -11,20 +11,21 @@
{eunit_opts, [verbose]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager", {tag, "2.2.3"}}},
{getopt, ".*", {git, "git://github.com/jcomellas/getopt", {tag, "v0.4"}}},
{meck, "0.8.2", {git, "git://github.com/basho/meck.git", {tag, "0.8.2"}}},
{mapred_verify, ".*", {git, "git://github.com/basho/mapred_verify", {branch, "master"}}},
{riakc, "2.1.2", {git, "git://github.com/basho/riak-erlang-client", {tag, "2.1.2"}}},
{riakhttpc, ".*", {git, "git://github.com/basho/riak-erlang-http-client", {tag, "2.1.2"}}},
{kvc, "1.3.0", {git, "https://github.com/etrepum/kvc", {tag, "v1.3.0"}}},
{druuid, ".*", {git, "git://github.com/kellymclaughlin/druuid.git", {tag, "0.2"}}}
{getopt, ".*", {git, "https://github.com/basho/getopt", {tag, "v0.4"}}},
{kvc, ".*", {git, "https://github.com/basho/kvc", {tag, "v1.5.0"}}},
{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.4"}}},
{druuid, ".*", {git, "https://github.com/basho/druuid.git", {tag, "0.2"}}},
{mapred_verify, ".*", {git, "https://github.com/basho/mapred_verify", {tag, "0.1"}}},
{meck, ".*", {git, "https://github.com/basho/meck.git", {tag, "0.8.2"}}},
{rebar_lock_deps_plugin, ".*", {git, "https://github.com/basho/rebar_lock_deps_plugin.git", {tag, "3.1.0p1"}}},
{riakc, ".*", {git, "https://github.com/basho/riak-erlang-client", {branch, "develop"}}},
{riakhttpc, ".*", {git, "https://github.com/basho/riak-erlang-http-client", {branch, "develop"}}}
]}.
{escript_incl_apps, [goldrush, lager, getopt, riakhttpc, riakc, ibrowse, mochiweb, kvc]}.
{escript_emu_args, "%%! -escript main riak_test_escript +K true +P 10000 -env ERL_MAX_PORTS 10000\n"}.
{plugin_dir, "src"}.
{plugins, [rebar_riak_test_plugin]}.
{plugins, [rebar_riak_test_plugin, rebar_lock_deps_plugin]}.
{riak_test, [
{test_paths, ["tests", "perf"]},
{test_output, "ebin"}

62
rebar.config.lock Normal file
View File

@ -0,0 +1,62 @@
%% THIS FILE IS GENERATED. DO NOT EDIT IT MANUALLY %%
{require_otp_vsn,"R13B04|R14|R15|R16"}.
{cover_enabled,true}.
{edoc_opts,[{preprocess,true}]}.
{erl_opts,[{src_dirs,[src,intercepts,perf]},
warnings_as_errors,
{parse_transform,lager_transform}]}.
{erl_first_files,["src/rt_intercept_pt.erl"]}.
{eunit_opts,[verbose]}.
{deps,[{getopt,".*",
{git,"https://github.com/basho/getopt",
"69b51ebf337f41906045a606a151d9578751e1ff"}},
{kvc,".*",
{git,"https://github.com/basho/kvc",
"5565fe51857747662410cc3c06362ebcf48a2f04"}},
{goldrush,".*",
{git,"https://github.com/basho/goldrush.git",
"8f1b715d36b650ec1e1f5612c00e28af6ab0de82"}},
{lager,".*",
{git,"https://github.com/basho/lager",
"81eaef0ce98fdbf64ab95665e3bc2ec4b24c7dac"}},
{druuid,".*",
{git,"https://github.com/basho/druuid.git",
"b3c5c2a52bb3f510d168b32e64f6fbc6a3c6a0e6"}},
{mapred_verify,".*",
{git,"https://github.com/basho/mapred_verify",
"51b79c5b05eb04640e13131f06ef96561b96ba8e"}},
{meck,".*",
{git,"https://github.com/basho/meck.git",
"dde759050eff19a1a80fd854d7375174b191665d"}},
{rebar_lock_deps_plugin,".*",
{git,"https://github.com/basho/rebar_lock_deps_plugin.git",
"8816f45ff38cd04d5c7741bd39166af58116dd44"}},
{hamcrest,".*",
{git,"https://github.com/basho/hamcrest-erlang.git",
"98bc7aa19ea081478c816824aa05fc5a48acae66"}},
{riak_pb,".*",
{git,"https://github.com/basho/riak_pb",
"adc603b36cfbb75aa09e302d5aa33859f0d24ad2"}},
{riakc,".*",
{git,"https://github.com/basho/riak-erlang-client",
"c203271b327a04892056bbb7500ac283a13afa68"}},
{ibrowse,".*",
{git,"https://github.com/basho/ibrowse.git",
"b28542d1e326ba44bcfaf7fd6d3c7f8761d20f08"}},
{mochiweb,".*",
{git,"git://github.com/basho/mochiweb.git",
"4d3882181d0e0e507a05115782a2b091a1db2be4"}},
{webmachine,".*",
{git,"https://github.com/basho/webmachine",
"77789b7cbb01cac4c7936578acc55245570d5afe"}},
{riakhttpc,".*",
{git,"https://github.com/basho/riak-erlang-http-client",
"2d4e58371e4d982bee1e0ad804b1403b1333573c"}}]}.
{escript_incl_apps,[goldrush,lager,getopt,riakhttpc,riakc,ibrowse,mochiweb,
kvc]}.
{escript_emu_args,"%%! -escript main riak_test_escript +K true +P 10000 -env ERL_MAX_PORTS 10000\n"}.
{plugin_dir,"src"}.
{plugins,[rebar_riak_test_plugin,rebar_lock_deps_plugin]}.
{riak_test,[{test_paths,["tests","perf"]},{test_output,"ebin"}]}.

View File

@ -297,26 +297,46 @@ run_test(Test, Outdir, TestMetaData, Report, HarnessArgs, NumTests) ->
1 -> keep_them_up;
_ -> rt:teardown()
end,
CoverageFile = rt_cover:maybe_export_coverage(Test, CoverDir, erlang:phash2(TestMetaData)),
CoverageFile = rt_cover:maybe_export_coverage(Test,
CoverDir,
erlang:phash2(TestMetaData)),
case Report of
undefined -> ok;
_ ->
{value, {log, L}, TestResult} = lists:keytake(log, 1, SingleTestResult),
{value, {log, L}, TestResult} =
lists:keytake(log, 1, SingleTestResult),
case giddyup:post_result(TestResult) of
error -> woops;
{ok, Base} ->
%% Now push up the artifacts, starting with the test log
giddyup:post_artifact(Base, {"riak_test.log", L}),
[ giddyup:post_artifact(Base, File) || File <- rt:get_node_logs() ],
[giddyup:post_artifact(Base, {filename:basename(CoverageFile) ++ ".gz",
zlib:gzip(element(2,file:read_file(CoverageFile)))}) || CoverageFile /= cover_disabled ],
ResultPlusGiddyUp = TestResult ++ [{giddyup_url, list_to_binary(Base)}],
[ rt:post_result(ResultPlusGiddyUp, WebHook) || WebHook <- get_webhooks() ]
[giddyup:post_artifact(Base, File)
|| File <- rt:get_node_logs()],
maybe_post_debug_logs(Base),
[giddyup:post_artifact(
Base,
{filename:basename(CoverageFile) ++ ".gz",
zlib:gzip(element(2,file:read_file(CoverageFile)))})
|| CoverageFile /= cover_disabled],
ResultPlusGiddyUp = TestResult ++
[{giddyup_url, list_to_binary(Base)}],
[rt:post_result(ResultPlusGiddyUp, WebHook) ||
WebHook <- get_webhooks()]
end
end,
rt_cover:stop(),
[{coverdata, CoverageFile} | SingleTestResult].
maybe_post_debug_logs(Base) ->
case rt_config:get(giddyup_post_debug_logs, true) of
true ->
NodeDebugLogs = rt:get_node_debug_logs(),
[giddyup:post_artifact(Base, File)
|| File <- NodeDebugLogs];
_ ->
false
end.
get_webhooks() ->
Hooks = lists:foldl(fun(E, Acc) -> [parse_webhook(E) | Acc] end,
[],

View File

@ -55,6 +55,7 @@
cmd/2,
connection_info/1,
console/2,
copy_conf/3,
count_calls/2,
create_and_activate_bucket_type/3,
del_dir/1,
@ -64,6 +65,7 @@
deploy_clusters/1,
down/2,
enable_search_hook/2,
ensure_random_seeded/0,
expect_in_log/2,
get_call_count/2,
get_deps/0,
@ -108,6 +110,8 @@
product/1,
priv_dir/0,
random_sublist/2,
random_uniform/0,
random_uniform/1,
remove/2,
riak/2,
riak_repl/2,
@ -144,6 +148,7 @@
update_app_config/2,
upgrade/2,
upgrade/3,
upgrade/4,
versions/0,
wait_for_any_webmachine_route/2,
wait_for_cluster_service/2,
@ -395,12 +400,25 @@ stop_and_wait(Node) ->
%% @doc Upgrade a Riak `Node' to the specified `NewVersion'.
upgrade(Node, NewVersion) ->
?HARNESS:upgrade(Node, NewVersion).
upgrade(Node, NewVersion, fun no_op/1).
%% @doc Upgrade a Riak `Node' to the specified `NewVersion'.
%% Upgrade Callback will be called after the node is stopped but before
%% the upgraded node is started.
upgrade(Node, NewVersion, UpgradeCallback) when is_function(UpgradeCallback) ->
?HARNESS:upgrade(Node, NewVersion, UpgradeCallback);
%% @doc Upgrade a Riak `Node' to the specified `NewVersion' and update
%% the config based on entries in `Config'.
upgrade(Node, NewVersion, Config) ->
?HARNESS:upgrade(Node, NewVersion, Config).
upgrade(Node, NewVersion, Config, fun no_op/1).
%% @doc Upgrade a Riak `Node' to the specified `NewVersion' and update
%% the config based on entries in `Config'.
%% Upgrade Callback will be called after the node is stopped but before
%% the upgraded node is started.
upgrade(Node, NewVersion, Config, UpgradeCallback) ->
?HARNESS:upgrade(Node, NewVersion, Config, UpgradeCallback).
%% @doc Upgrade a Riak node to a specific version using the alternate
%% leave/upgrade/rejoin approach
@ -751,6 +769,7 @@ wait_until_transfers_complete([Node0|_]) ->
lager:info("Wait until transfers complete ~p", [Node0]),
F = fun(Node) ->
{DownNodes, Transfers} = rpc:call(Node, riak_core_status, transfers, []),
lager:info("DownNodes: ~p Transfers: ~p", [DownNodes, Transfers]),
DownNodes =:= [] andalso Transfers =:= []
end,
?assertEqual(ok, wait_until(Node0, F)),
@ -762,6 +781,7 @@ wait_for_service(Node, Services) when is_list(Services) ->
{badrpc, Error} ->
{badrpc, Error};
CurrServices when is_list(CurrServices) ->
lager:info("Waiting for services ~p: current services: ~p", [Services, CurrServices]),
lists:all(fun(Service) -> lists:member(Service, CurrServices) end, Services);
Res ->
Res
@ -1175,7 +1195,7 @@ join_cluster(Nodes) ->
%% large amount of redundant handoff done in a sequential join
[staged_join(Node, Node1) || Node <- OtherNodes],
plan_and_commit(Node1),
try_nodes_ready(Nodes, 3, 500)
try_nodes_ready(Nodes)
end,
?assertEqual(ok, wait_until_nodes_ready(Nodes)),
@ -1200,6 +1220,9 @@ product(Node) ->
true -> unknown
end.
try_nodes_ready(Nodes) ->
try_nodes_ready(Nodes, 10, 500).
try_nodes_ready([Node1 | _Nodes], 0, _SleepMs) ->
lager:info("Nodes not ready after initial plan/commit, retrying"),
plan_and_commit(Node1);
@ -1301,8 +1324,10 @@ systest_verify_delete(Node, Start, End, Bucket, R) ->
systest_write(Node, Size) ->
systest_write(Node, Size, 2).
systest_write(Node, Size, W) ->
systest_write(Node, 1, Size, <<"systest">>, W).
systest_write(Node, Size, W) when is_integer(W) ->
systest_write(Node, 1, Size, <<"systest">>, W);
systest_write(Node, Size, Bucket) ->
systest_write(Node, 1, Size, Bucket, 2).
systest_write(Node, Start, End, Bucket, W) ->
systest_write(Node, Start, End, Bucket, W, <<>>).
@ -1382,7 +1407,7 @@ object_value(1, Obj, _SquashSiblings) ->
object_value(_ValueCount, Obj, false) ->
riak_object:get_value(Obj);
object_value(_ValueCount, Obj, true) ->
lager:debug("Siblings detected for ~p:~p", [riak_object:bucket(Obj), riak_object:key(Obj)]),
lager:debug("Siblings detected for ~p:~p~n~p", [riak_object:bucket(Obj), riak_object:key(Obj), Obj]),
Contents = riak_object:get_contents(Obj),
case lists:foldl(fun sibling_compare/2, {true, undefined}, Contents) of
{true, {_, _, _, Value}} ->
@ -1684,6 +1709,10 @@ attach_direct(Node, Expected) ->
console(Node, Expected) ->
?HARNESS:console(Node, Expected).
%% @doc Copies config files from one set of nodes to another
copy_conf(NumNodes, FromVersion, ToVersion) ->
?HARNESS:copy_conf(NumNodes, FromVersion, ToVersion).
%%%===================================================================
%%% Search
%%%===================================================================
@ -1831,6 +1860,9 @@ setup_harness(Test, Args) ->
get_node_logs() ->
?HARNESS:get_node_logs().
get_node_debug_logs() ->
?HARNESS:get_node_debug_logs().
%% @doc Performs a search against the log files on `Node' and returns all
%% matching lines.
-spec search_logs(node(), Pattern::iodata()) ->
@ -2057,17 +2089,14 @@ wait_for_control(VersionedNodes) when is_list(VersionedNodes) ->
-spec select_random([any()]) -> any().
select_random(List) ->
Length = length(List),
Idx = random:uniform(Length),
Idx = random_uniform(Length),
lists:nth(Idx, List).
%% @doc Returns a random element from a given list.
-spec random_sublist([any()], integer()) -> [any()].
random_sublist(List, N) ->
% Properly seeding the process.
<<A:32, B:32, C:32>> = crypto:rand_bytes(12),
random:seed({A, B, C}),
% Assign a random value for each element in the list.
List1 = [{random:uniform(), E} || E <- List],
List1 = [{random_uniform(), E} || E <- List],
% Sort by the random number.
List2 = lists:sort(List1),
% Take the first N elements.
@ -2075,6 +2104,34 @@ random_sublist(List, N) ->
% Remove the random numbers.
[ E || {_,E} <- List3].
-spec random_uniform() -> float().
%% @doc Like random:uniform/0, but always seeded with quality entropy.
random_uniform() ->
ok = ensure_random_seeded(),
random:uniform().
-spec random_uniform(Range :: pos_integer()) -> pos_integer().
%% @doc Like random:uniform/1, but always seeded with quality entropy.
random_uniform(Range) ->
ok = ensure_random_seeded(),
random:uniform(Range).
-spec ensure_random_seeded() -> ok.
%% @doc Ensures that the random module's PRNG is seeded with the good stuff.
ensure_random_seeded() ->
Key = {?MODULE, random_seeded},
case erlang:get(Key) of
true ->
ok;
_ ->
% crypto:rand_bytes/1 is deprecated in OTP-19
<<A:32/integer, B:32/integer, C:32/integer>>
= crypto:strong_rand_bytes(12),
random:seed(A, B, C),
erlang:put(Key, true),
ok
end.
%% @doc Recusively delete files in a directory.
-spec del_dir(string()) -> strings().
del_dir(Dir) ->
@ -2147,6 +2204,11 @@ stop_tracing() ->
dbg:stop_clear(),
ok.
get_primary_preflist(Node, Bucket, Key, NVal) ->
DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [{Bucket, Key}]),
PL = rpc:call(Node, riak_core_apl, get_primary_apl, [DocIdx, NVal, riak_kv]),
{ok, PL}.
%% @doc Trace fun calls and store their count state into an ETS table.
-spec trace_count({trace, pid(), call|return_from,
{atom(), atom(), non_neg_integer()}}, {node(), [node()]}) ->
@ -2176,6 +2238,10 @@ assert_supported(Capabilities, Capability, Value) ->
ok.
-spec no_op(term()) -> ok.
no_op(_Params) ->
ok.
-ifdef(TEST).
verify_product(Applications, ExpectedApplication) ->

View File

@ -26,6 +26,8 @@
-define(DEVS(N), lists:concat(["dev", N, "@127.0.0.1"])).
-define(DEV(N), list_to_atom(?DEVS(N))).
-define(PATH, (rt_config:get(rtdev_path))).
-define(DEBUG_LOG_FILE(N),
"dev" ++ integer_to_list(N) ++ "@127.0.0.1-riak-debug.tar.gz").
get_deps() ->
lists:flatten(io_lib:format("~s/dev/dev1/lib", [relpath(current)])).
@ -52,6 +54,17 @@ riak_admin_cmd(Path, N, Args) ->
ExecName = rt_config:get(exec_name, "riak"),
io_lib:format("~s/dev/dev~b/bin/~s-admin ~s", [Path, N, ExecName, ArgStr]).
riak_debug_cmd(Path, N, Args) ->
Quoted =
lists:map(fun(Arg) when is_list(Arg) ->
lists:flatten([$", Arg, $"]);
(_) ->
erlang:error(badarg)
end, Args),
ArgStr = string:join(Quoted, " "),
ExecName = rt_config:get(exec_name, "riak"),
lists:flatten(io_lib:format("~s/dev/dev~b/bin/~s-debug ~s", [Path, N, ExecName, ArgStr])).
run_git(Path, Cmd) ->
lager:info("Running: ~s", [gitcmd(Path, Cmd)]),
{0, Out} = cmd(gitcmd(Path, Cmd)),
@ -124,10 +137,10 @@ relpath(root, Path) ->
relpath(_, _) ->
throw("Version requested but only one path provided").
upgrade(Node, NewVersion) ->
upgrade(Node, NewVersion, same).
upgrade(Node, NewVersion, UpgradeCallback) when is_function(UpgradeCallback) ->
upgrade(Node, NewVersion, same, UpgradeCallback).
upgrade(Node, NewVersion, Config) ->
upgrade(Node, NewVersion, Config, UpgradeCallback) ->
N = node_id(Node),
Version = node_version(N),
lager:info("Upgrading ~p : ~p -> ~p", [Node, Version, NewVersion]),
@ -154,10 +167,32 @@ upgrade(Node, NewVersion, Config) ->
same -> ok;
_ -> update_app_config(Node, Config)
end,
Params = [
{old_data_dir, io_lib:format("~s/dev/dev~b/data", [OldPath, N])},
{new_data_dir, io_lib:format("~s/dev/dev~b/data", [NewPath, N])},
{old_version, Version},
{new_version, NewVersion}
],
ok = UpgradeCallback(Params),
start(Node),
rt:wait_until_pingable(Node),
ok.
-spec copy_conf(integer(), atom() | string(), atom() | string()) -> ok.
copy_conf(NumNodes, FromVersion, ToVersion) ->
lager:info("Copying config from ~p to ~p", [FromVersion, ToVersion]),
FromPath = relpath(FromVersion),
ToPath = relpath(ToVersion),
[copy_node_conf(N, FromPath, ToPath) || N <- lists:seq(1, NumNodes)].
copy_node_conf(NodeNum, FromPath, ToPath) ->
Command = io_lib:format("cp -p -P -R \"~s/dev/dev~b/etc\" \"~s/dev/dev~b\"",
[FromPath, NodeNum, ToPath, NodeNum]),
os:cmd(Command),
ok.
-spec set_conf(atom() | string(), [{string(), string()}]) -> ok.
set_conf(all, NameValuePairs) ->
lager:info("rtdev:set_conf(all, ~p)", [NameValuePairs]),
@ -765,6 +800,35 @@ get_node_logs() ->
{lists:nthtail(RootLen, Filename), Port}
end || Filename <- filelib:wildcard(Root ++ "/*/dev/dev*/log/*") ].
get_node_debug_logs() ->
NodeMap = rt_config:get(rt_nodes),
lists:foldl(fun get_node_debug_logs/2,
[], NodeMap).
get_node_debug_logs({_Node, NodeNum}, Acc) ->
DebugLogFile = ?DEBUG_LOG_FILE(NodeNum),
delete_existing_debug_log_file(DebugLogFile),
Path = relpath(node_version(NodeNum)),
Args = ["--logs"],
Cmd = riak_debug_cmd(Path, NodeNum, Args),
{ExitCode, Result} = wait_for_cmd(spawn_cmd(Cmd)),
lager:info("~p ExitCode ~p, Result = ~p", [Cmd, ExitCode, Result]),
case filelib:is_file(DebugLogFile) of
true ->
{ok, Binary} = file:read_file(DebugLogFile),
Acc ++ [{DebugLogFile, Binary}];
_ ->
Acc
end.
%% If the debug log file exists from a previous test run it will cause the
%% `riak_debug_cmd' to fail. Therefore, delete the `DebugLogFile' if it exists.
%% Note that by ignoring the return value of `file:delete/1' this function
%% works whether or not the `DebugLogFile' actually exists at the time it is
%% called.
delete_existing_debug_log_file(DebugLogFile) ->
file:delete(DebugLogFile).
%% @doc Performs a search against the log files on `Node' and returns all
%% matching lines.
-spec search_logs(node(), Pattern::iodata()) ->

View File

@ -250,10 +250,12 @@ stop(Node) ->
run_riak(Node, "stop"),
ok.
upgrade(Node, NewVersion) ->
upgrade(Node, NewVersion, same).
upgrade(Node, NewVersion, UpgradeCallback) when is_function(UpgradeCallback) ->
upgrade(Node, NewVersion, same, UpgradeCallback).
upgrade(Node, NewVersion, Config) ->
%% upgrade callback unsupported for this driver until there is a need.
%% c.f., rtdev:upgrade/4
upgrade(Node, NewVersion, Config, _UpgradeCallback) ->
Version = node_version(Node),
lager:info("Upgrading ~p : ~p -> ~p", [Node, Version, NewVersion]),
stop(Node),
@ -280,6 +282,9 @@ upgrade(Node, NewVersion, Config) ->
rt:wait_until_pingable(Node),
ok.
copy_conf(_, _, _) ->
throw({error, not_implemented}).
run_riak(Node, Cmd) ->
Exec = riakcmd(Node, Cmd),
lager:info("Running: ~s :: ~s", [get_host(Node), Exec]),

View File

@ -26,6 +26,7 @@
check_exists/2,
clear_trees/1,
commit/2,
drain_solrqs/1,
expire_trees/1,
gen_keys/1,
host_entries/1,
@ -46,6 +47,7 @@
wait_for_index/2,
wait_for_schema/2,
wait_for_schema/3,
wait_until/2,
write_data/5,
write_data/6,
http/4,
@ -243,6 +245,7 @@ brutal_kill_remove_index_dirs(Nodes, IndexName, Services) ->
-spec remove_index_dirs([node()], index_name(), [atom()]) -> ok.
remove_index_dirs(Nodes, IndexName, Services) ->
IndexDirs = get_index_dirs(IndexName, Nodes),
[rt:stop(ANode) || ANode <- Nodes],
remove_index_dirs2(Nodes, IndexDirs, Services),
ok.
@ -453,6 +456,13 @@ commit(Nodes, Index) ->
rpc:multicall(Nodes, yz_solr, commit, [Index]),
ok.
-spec drain_solrqs(node() | cluster()) -> ok.
drain_solrqs(Cluster) when is_list(Cluster) ->
[drain_solrqs(Node) || Node <- Cluster];
drain_solrqs(Node) ->
rpc:call(Node, yz_solrq_drain_mgr, drain, []),
ok.
-spec override_schema(pid(), [node()], index_name(), schema_name(), string()) ->
{ok, [node()]}.
override_schema(Pid, Cluster, Index, Schema, RawUpdate) ->

View File

@ -24,7 +24,9 @@
-compile(export_all).
-export([confirm/0]).
-define(PING_FAILURE_OUTPUT, "Node did not respond to ping!").
% node_package 3.x changes this - new first, old second
-define(PING_FAILURE_OUTPUT,
["Node did not respond to ping!", "Node is not running!"]).
confirm() ->
@ -120,7 +122,7 @@ ping_down_test(Node) ->
attach_down_test(Node) ->
lager:info("Testing riak attach while down"),
{ok, AttachOut} = rt:riak(Node, ["attach"]),
?assert(rt:str(AttachOut, ?PING_FAILURE_OUTPUT)),
?assert(rt:str_mult(AttachOut, ?PING_FAILURE_OUTPUT)),
ok.
attach_direct_up_test(Node) ->
@ -135,7 +137,7 @@ attach_direct_up_test(Node) ->
attach_direct_down_test(Node) ->
lager:info("Testing riak attach-direct while down"),
{ok, AttachOut} = rt:riak(Node, ["attach-direct"]),
?assert(rt:str(AttachOut, ?PING_FAILURE_OUTPUT)),
?assert(rt:str_mult(AttachOut, ?PING_FAILURE_OUTPUT)),
ok.
status_up_test(Node) ->
@ -153,7 +155,7 @@ status_down_test(Node) ->
lager:info("Test riak-admin status while down"),
{ok, {ExitCode, StatusOut}} = rt:admin(Node, ["status"], [return_exit_code]),
?assertEqual(1, ExitCode),
?assert(rt:str(StatusOut, ?PING_FAILURE_OUTPUT)),
?assert(rt:str_mult(StatusOut, ?PING_FAILURE_OUTPUT)),
ok.
getpid_up_test(Node) ->

605
tests/job_enable_common.erl Normal file
View File

@ -0,0 +1,605 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(job_enable_common).
% Public API for use by other tests
-export([
bin_bucket/1,
bin_key/1,
bin_val/1,
close_client/1,
enabled_string/1,
get_enabled/2,
index_2i/0,
index_name/1,
index_yz/0,
load_data/1,
num_buckets/0, num_buckets/1,
num_keys/0, num_keys/1,
open_client/2,
populated_bucket/0,
set_enabled/3,
setup_cluster/1,
setup_yokozuna/1,
test_buckets/0,
test_keys/0,
test_label/3,
test_nums/0,
test_operation/4,
test_vals/0,
undefined_bucket/0
]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("riakc/include/riakc.hrl").
-include_lib("riakhttpc/include/rhc.hrl").
-include("job_enable_common.hrl").
-define(DEFAULT_NUM_BUCKETS, 7).
-define(DEFAULT_NUM_KEYS, 9).
%% ===================================================================
%% Test API
%% ===================================================================
enabled_string(true) ->
"enabled";
enabled_string(false) ->
"disabled".
test_label(Class, Enabled, ClientType) ->
io_lib:format("~s ~p ~s", [ClientType, Class, enabled_string(Enabled)]).
bin_bucket(Num) ->
erlang:list_to_binary(["Bucket_", erlang:integer_to_list(Num)]).
bin_key(Num) ->
erlang:list_to_binary(["Key_", erlang:integer_to_list(Num)]).
bin_val(Num) ->
erlang:list_to_binary(["Val_", erlang:integer_to_list(Num)]).
index_2i() ->
{integer_index, "valnum_index_2i"}.
index_yz() ->
<<"valnum_index_yz">>.
index_name(Name) when erlang:is_atom(Name) ->
erlang:atom_to_list(Name);
index_name(Name) when erlang:is_binary(Name) ->
erlang:binary_to_list(Name);
index_name(Name) when erlang:is_list(Name) ->
Name;
index_name({binary_index, Name}) ->
index_name(Name) ++ "_bin";
index_name({integer_index, Name}) ->
index_name(Name) ++ "_int";
index_name(Index) ->
erlang:error(badarg, [Index]).
num_buckets() ->
Key = {?MODULE, num_buckets},
case erlang:get(Key) of
undefined ->
Num = ?DEFAULT_NUM_BUCKETS,
erlang:put(Key, Num),
Num;
Val ->
Val
end.
num_buckets(Num) when erlang:is_integer(Num) andalso Num > 0 ->
Key = {?MODULE, num_buckets},
case erlang:get(Key) of
undefined ->
erlang:put(Key, Num),
Num;
Num ->
Num;
_ ->
erlang:erase({?MODULE, test_buckets}),
erlang:erase({?MODULE, populated_bucket}),
erlang:put(Key, Num),
Num
end.
num_keys() ->
Key = {?MODULE, num_keys},
case erlang:get(Key) of
undefined ->
Num = ?DEFAULT_NUM_KEYS,
erlang:put(Key, Num),
Num;
Val ->
Val
end.
num_keys(Num) when erlang:is_integer(Num) andalso Num > 0 ->
Key = {?MODULE, num_keys},
case erlang:get(Key) of
undefined ->
erlang:put(Key, Num),
Num;
Num ->
Num;
_ ->
erlang:erase({?MODULE, test_keys}),
erlang:erase({?MODULE, test_nums}),
erlang:erase({?MODULE, test_vals}),
erlang:put(Key, Num),
Num
end.
populated_bucket() ->
Key = {?MODULE, populated_bucket},
case erlang:get(Key) of
undefined ->
Buckets = test_buckets(),
Bucket = lists:nth(erlang:length(Buckets) div 2, Buckets),
erlang:put(Key, Bucket),
Bucket;
Val ->
Val
end.
undefined_bucket() ->
<<"Undefined_Bucket">>.
test_buckets() ->
Key = {?MODULE, test_buckets},
case erlang:get(Key) of
undefined ->
New = bin_buckets(num_buckets(), []),
erlang:put(Key, New),
New;
Val ->
Val
end.
test_keys() ->
Key = {?MODULE, test_keys},
case erlang:get(Key) of
undefined ->
New = bin_keys(num_keys(), []),
erlang:put(Key, New),
New;
Val ->
Val
end.
test_nums() ->
Key = {?MODULE, test_nums},
case erlang:get(Key) of
undefined ->
New = lists:seq(1, num_keys()),
erlang:put(Key, New),
New;
Val ->
Val
end.
test_vals() ->
Key = {?MODULE, test_vals},
case erlang:get(Key) of
undefined ->
New = bin_vals(num_keys(), []),
erlang:put(Key, New),
New;
Val ->
Val
end.
get_enabled(Nodes, Class) when erlang:is_list(Nodes) ->
[get_enabled(Node, Class) || Node <- Nodes];
get_enabled(Node, {App, Op}) ->
rpc:call(Node, riak_core_util, job_class_enabled, [App, Op]).
set_enabled([], _, _) ->
ok;
set_enabled([Node | Nodes], Class, Enabled) ->
?assertEqual(ok, set_enabled(Node, Class, Enabled)),
set_enabled(Nodes, Class, Enabled);
set_enabled(Node, {App, Op}, true) ->
rpc:call(Node, riak_core_util, enable_job_class, [App, Op]);
set_enabled(Node, {App, Op}, false) ->
rpc:call(Node, riak_core_util, disable_job_class, [App, Op]).
open_client(http = Type, Node) ->
% HTTP connections are constant records, so re-use them
Key = {?MODULE, httpc, Node},
case erlang:get(Key) of
undefined ->
New = {Type, rhc, rt:httpc(Node)},
erlang:put(Key, New),
New;
Conn ->
Conn
end;
open_client(pbc = Type, Node) ->
{Type, riakc_pb_socket, rt:pbc(Node)}.
close_client({http, _Mod, _RHC}) ->
ok;
close_client({pbc, Mod, PBC}) ->
Mod:stop(PBC).
setup_cluster([Node | _] = Nodes) ->
lager:info("Creating a cluster of ~b nodes ...", [erlang:length(Nodes)]),
?assertEqual(ok, rt:join_cluster(Nodes)),
load_data(Node),
?assertEqual(ok, rt:wait_until_transfers_complete(Nodes)).
setup_yokozuna([Node | _]) ->
setup_yokozuna(Node);
setup_yokozuna(Node) ->
% create the YZ search index
{_, Mod, Conn} = Client = open_client(pbc, Node),
?assertEqual(ok, Mod:create_search_index(Conn, index_yz())),
close_client(Client).
load_data([Node | _]) ->
load_data(Node);
load_data(Node) ->
lager:info("Writing known data to node ~p ...", [Node]),
PBConn = rt:pbc(Node),
load_data(PBConn, populated_bucket(), test_buckets()),
riakc_pb_socket:stop(PBConn).
test_operation(Node, Class, Enabled, ClientType) ->
lager:info("Testing ~s on ~p",
[test_label(Class, Enabled, ClientType), Node]),
test_request(Node, Class, Enabled, ClientType).
%% ===================================================================
%% Internal Operation Tests
%% ===================================================================
%%
%% Notes on test_request/4 implementation:
%%
%% The 'rhc' and 'riakc_pb_socket' hide a lot of implementation details,
%% including the command they actually issue, so we rely on the error message
%% in the response for disabled switches to confirm that the request got routed
%% to where we wanted it to on the receiving end.
%%
%% This results in some odd head clause ordering below, as the approach differs
%% for each operation. All operations for a given ?TOKEN_XXX are clustered
%% together, but the order within the cluster varies as we match patterns as
%% dictated by the behavior of the client modules for each.
%%
%% We currently uses 'riakc_pb_socket' for protobufs, but that doesn't give us
%% access to all available operations, so some are stubbed out unless/until we
%% dig deeper and implement them ourselves.
%%
%% The 'rhc' module has the same problem, but compounds it by not returning the
%% response body on errors, so for tests where it doesn't give us what we want
%% we skip it and use 'ibrowse' directly, building the URL from scratch.
%% For some reason using rt:httpc(Node) and getting the host/port out of the
%% returned #rhc{} is more reliable than calling rt:get_https_conn_info
%% directly.
%%
% riakc_pb_socket always lists buckets with streams, so skip the non-stream
% test unless/until we want to implement it directly.
test_request(Node, ?TOKEN_LIST_BUCKETS = Class, Enabled, pbc = ClientType) ->
{_, Mod, _} = Client = open_client(ClientType, Node),
lager:warning(
"non-streaming list-buckets is not implemented in the ~p client,"
" skipping the ~s test.",
[Mod, test_label(Class, Enabled, ClientType)]),
close_client(Client),
ok;
test_request(Node, ?TOKEN_LIST_BUCKETS = Class, Enabled, http = Scheme) ->
URL = make_url(Node, Scheme, "/buckets?buckets=true"),
Result = ibrowse:send_req(URL, [], get, [], [{response_format, binary}]),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
case Enabled of
true ->
{struct, PList} = mochijson2:decode(
unicode:characters_to_list(Body, utf8)),
Buckets = proplists:get_value(<<"buckets">>, PList, []),
?assertEqual({"200", test_buckets()}, {Code, lists:sort(Buckets)});
false ->
?assertEqual({"403", ?ERRMSG_BIN(Class)}, {Code, Body})
end;
test_request(Node, ?TOKEN_LIST_BUCKETS_S = Class, false, http = Scheme) ->
URL = make_url(Node, Scheme, "/buckets?buckets=stream"),
Result = ibrowse:send_req(URL, [], get),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
?assertEqual({"403", ?ERRMSG_TXT(Class)}, {Code, Body});
test_request(Node, ?TOKEN_LIST_BUCKETS_S = Class, Enabled, ClientType) ->
{_, Mod, Conn} = Client = open_client(ClientType, Node),
% 'rhc' and 'riakc_pb_socket' list_buckets always use stream_list_buckets
Result = Mod:list_buckets(Conn),
close_client(Client),
case Enabled of
true ->
?assertMatch({ok, L} when erlang:is_list(L), Result),
{ok, Buckets} = Result,
?assertEqual(test_buckets(), lists:sort(Buckets));
false ->
?assertEqual({error, ?ERRMSG_BIN(Class)}, Result)
end;
% protobuf list-keys only does streams, so skip the non-stream test
test_request(_, ?TOKEN_LIST_KEYS = Class, Enabled, pbc = ClientType) ->
lager:info(
"non-streaming list-keys over protobufs is not implemented in Riak,"
" skipping the ~s test.", [test_label(Class, Enabled, ClientType)]),
ok;
test_request(Node, ?TOKEN_LIST_KEYS = Class, Enabled, http = Scheme) ->
URL = make_url(Node, Scheme, ["/buckets/",
erlang:binary_to_list(populated_bucket()), "/keys?keys=true"]),
Result = ibrowse:send_req(URL, [], get, [], [{response_format, binary}]),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
case Enabled of
true ->
{struct, PList} = mochijson2:decode(
unicode:characters_to_list(Body, utf8)),
Keys = proplists:get_value(<<"keys">>, PList, []),
?assertEqual({"200", test_keys()}, {Code, lists:sort(Keys)});
false ->
?assertEqual({"403", ?ERRMSG_BIN(Class)}, {Code, Body})
end;
test_request(Node, ?TOKEN_LIST_KEYS_S = Class, false, http = Scheme) ->
URL = make_url(Node, Scheme, ["/buckets/",
erlang:binary_to_list(populated_bucket()), "/keys?keys=stream"]),
Result = ibrowse:send_req(URL, [], get),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
?assertEqual({"403", ?ERRMSG_TXT(Class)}, {Code, Body});
test_request(Node, ?TOKEN_LIST_KEYS_S = Class, Enabled, ClientType) ->
{_, Mod, Conn} = Client = open_client(ClientType, Node),
% 'rhc' and 'riakc_pb_socket' list_keys always use stream_list_keys
Result = Mod:list_keys(Conn, populated_bucket()),
close_client(Client),
case Enabled of
true ->
?assertMatch({ok, _}, Result),
{ok, Keys} = Result,
?assertEqual(test_keys(), lists:sort(Keys));
false ->
?assertEqual({error, ?ERRMSG_BIN(Class)}, Result)
end;
% Map Reduce tests need a lot of love once Riak code discriminates between term
% and javascript MR requests.
% TODO: Change to discrete implementations so http error body is validated.
% TODO: Try both forms with the other enabled/disabled to check crossover.
test_request(Node, ?TOKEN_MAP_REDUCE = Class, Enabled, ClientType) ->
Bucket = populated_bucket(),
{_, Mod, Conn} = Client = open_client(ClientType, Node),
Result = Mod:mapred(Conn, Bucket, []),
close_client(Client),
case Enabled of
true ->
?assertMatch({ok, [{_, _}]}, Result),
{ok, [{_, Pairs}]} = Result,
Expect = case ClientType of
pbc ->
[{Bucket, Key} || Key <- test_keys()];
http ->
[[Bucket, Key] || Key <- test_keys()]
end,
?assertEqual(Expect, lists:sort(Pairs));
false ->
case ClientType of
pbc ->
?assertEqual({error, ?ERRMSG_BIN(Class)}, Result);
http ->
?assertMatch({error, {"403", _}}, Result)
end
end;
test_request(_Node, ?TOKEN_MAP_REDUCE_JS = Class, Enabled, ClientType) ->
lager:info(
"map-reduce javascript discrimination is not implemented in Riak,"
" skipping the ~s test.", [test_label(Class, Enabled, ClientType)]),
ok;
test_request(Node, ?TOKEN_SEC_INDEX = Class, Enabled, pbc = ClientType) ->
Bucket = populated_bucket(),
Index = index_2i(),
Num = rt:random_uniform(num_keys()),
{_, Mod, Conn} = Client = open_client(ClientType, Node),
Result = Mod:get_index_eq(Conn, Bucket, Index, Num, [{stream, false}]),
close_client(Client),
case Enabled of
true ->
Key = bin_key(Num),
?assertMatch({ok, {index_results_v1, [Key], _, _}}, Result);
false ->
?assertEqual({error, ?ERRMSG_BIN(Class)}, Result)
end;
test_request(Node, ?TOKEN_SEC_INDEX = Class, Enabled, http = Scheme) ->
Num = rt:random_uniform(num_keys()),
URL = make_url(Node, Scheme, [
"/buckets/", erlang:binary_to_list(populated_bucket()),
"/index/", index_name(index_2i()), "/", erlang:integer_to_list(Num) ]),
Result = ibrowse:send_req(URL, [], get, [], [{response_format, binary}]),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
case Enabled of
true ->
Key = bin_key(Num),
{struct, PList} = mochijson2:decode(
unicode:characters_to_list(Body, utf8)),
Keys = proplists:get_value(<<"keys">>, PList, []),
?assertEqual({"200", [Key]}, {Code, Keys});
false ->
?assertEqual({"403", ?ERRMSG_BIN(Class)}, {Code, Body})
end;
test_request(Node, ?TOKEN_SEC_INDEX_S = Class, Enabled, pbc = ClientType) ->
Lo = rt:random_uniform(num_keys() - 3),
Hi = (Lo + 3),
{_, Mod, Conn} = Client = open_client(ClientType, Node),
{ok, ReqId} = Mod:get_index_range(
Conn, populated_bucket(), index_2i(), Lo, Hi, [{stream, true}]),
% on success result keys are sorted by receive_2i_stream/2
Result = receive_2i_stream(ReqId, []),
close_client(Client),
case Enabled of
true ->
Expect = [bin_key(N) || N <- lists:seq(Lo, Hi)],
?assertEqual({ok, Expect}, Result);
false ->
?assertEqual({error, ?ERRMSG_BIN(Class)}, Result)
end;
test_request(Node, ?TOKEN_SEC_INDEX_S = Class, false, http = Scheme) ->
Num = rt:random_uniform(num_keys()),
URL = make_url(Node, Scheme, [
"/buckets/", erlang:binary_to_list(populated_bucket()),
"/index/", index_name(index_2i()), "/", erlang:integer_to_list(Num),
"?stream=true" ]),
Result = ibrowse:send_req(URL, [], get, [], [{response_format, binary}]),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
?assertEqual({"403", ?ERRMSG_BIN(Class)}, {Code, Body});
test_request(Node, ?TOKEN_SEC_INDEX_S, true, http = ClientType) ->
Bucket = populated_bucket(),
Index = index_2i(),
Num = rt:random_uniform(num_keys()),
Key = bin_key(Num),
{_, Mod, Conn} = Client = open_client(ClientType, Node),
Result = Mod:get_index(Conn, Bucket, Index, Num),
close_client(Client),
?assertMatch({ok, {index_results_v1, [Key], _, _}}, Result);
%% This requires that YZ be running and that
%% riakc_pb_socket:create_search_index(Connection, index_yz())
%% (or equivalent) has been successfully called before invoking this test.
%% This module's load_data/1 function DOES NOT do this for you by default.
test_request(Node, ?TOKEN_YZ_SEARCH = Class, Enabled, pbc = ClientType) ->
Index = index_yz(),
Bucket = populated_bucket(),
Num = rt:random_uniform(num_keys()),
Key = bin_key(Num),
Query = <<"_yz_rb:", Bucket/binary, " AND _yz_rk:", Key/binary>>,
{_, Mod, Conn} = Client = open_client(ClientType, Node),
Result = Mod:search(Conn, Index, Query),
close_client(Client),
case Enabled of
true ->
?assertMatch({ok, #search_results{}}, Result);
false ->
?assertEqual({error, ?ERRMSG_BIN(Class)}, Result)
end;
test_request(Node, ?TOKEN_YZ_SEARCH = Class, Enabled, http) ->
Bucket = populated_bucket(),
Num = rt:random_uniform(num_keys()),
Key = bin_key(Num),
URL = make_url(Node, [
"/search/query/", erlang:binary_to_list(index_yz()),
"?wt=json&q=_yz_rb:", erlang:binary_to_list(Bucket),
"%20AND%20_yz_rk:", erlang:binary_to_list(Key) ]),
Result = ibrowse:send_req(URL, [], get),
?assertMatch({ok, _, _, _}, Result),
{_, Code, _, Body} = Result,
case Enabled of
true ->
?assertEqual("200", Code);
false ->
?assertEqual({"403", ?ERRMSG_TXT(Class)}, {Code, Body})
end;
test_request(_Node, ?TOKEN_OLD_SEARCH = Class, Enabled, ClientType) ->
lager:warning(
"riak_search job switch test not implemented,"
" skipping the ~s test.", [test_label(Class, Enabled, ClientType)]),
ok.
%% ===================================================================
%% Internal Support
%% ===================================================================
bin_buckets(0, Result) ->
lists:sort(Result);
bin_buckets(Count, Result) ->
bin_buckets((Count - 1), [bin_bucket(Count) | Result]).
bin_keys(0, Result) ->
lists:sort(Result);
bin_keys(Count, Result) ->
bin_keys((Count - 1), [bin_key(Count) | Result]).
bin_vals(0, Result) ->
lists:sort(Result);
bin_vals(Count, Result) ->
bin_vals((Count - 1), [bin_val(Count) | Result]).
load_data(PBConn, Bucket, [Bucket | Buckets]) ->
Index = index_2i(),
Load = fun({Num, Key, Val}) ->
Obj1 = riakc_obj:new(Bucket, Key, Val),
Meta1 = riakc_obj:get_update_metadata(Obj1),
Meta2 = riakc_obj:set_secondary_index(Meta1, [{Index, [Num]}]),
Obj2 = riakc_obj:update_metadata(Obj1, Meta2),
?assertEqual(ok, riakc_pb_socket:put(PBConn, Obj2))
end,
lists:foreach(Load, lists:zip3(test_nums(), test_keys(), test_vals())),
load_data(PBConn, Bucket, Buckets);
load_data(PBConn, PopBucket, [Bucket | Buckets]) ->
?assertEqual(ok, riakc_pb_socket:put(PBConn,
riakc_obj:new(Bucket, <<"test_key">>, <<"test_value">>))),
load_data(PBConn, PopBucket, Buckets);
load_data(_, _, []) ->
ok.
make_url(#rhc{ip = IP, port = Port, options = Opts}, Parts) ->
case proplists:get_value(is_ssl, Opts) of
true ->
make_url(https, IP, Port, Parts);
_ ->
make_url(http, IP, Port, Parts)
end;
make_url(Node, Parts) ->
make_url(Node, http, Parts).
make_url(Node, Scheme, Parts) ->
% seems to be more reliable than calling rt:get_https_conn_info directly
#rhc{ip = IP, port = Port} = rt:httpc(Node),
make_url(Scheme, IP, Port, Parts).
make_url(Scheme, Host, Port, Parts) ->
lists:flatten([io_lib:format("~s://~s:~b", [Scheme, Host, Port]), Parts]).
receive_2i_stream(ReqId, Result) ->
receive
{ReqId, {done, _}} ->
{ok, lists:sort(lists:flatten(Result))};
{ReqId, {error, Reason}} ->
{error, Reason};
{ReqId, {index_stream_result_v1, [Val], _}} ->
receive_2i_stream(ReqId, [Val | Result]);
% sent once before 'done'
{ReqId, {index_stream_result_v1, [], _}} ->
receive_2i_stream(ReqId, Result);
% not clear if it can send more than one
{ReqId, {index_stream_result_v1, Vals, _}} when erlang:is_list(Vals) ->
receive_2i_stream(ReqId, Vals ++ Result)
end.

View File

@ -0,0 +1,63 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-ifndef(job_enable_common_included).
-define(job_enable_common_included, true).
-define(APP_CONFIG_KEY, 'job_accept_class').
-define(CUTTLEFISH_PREFIX, "cluster.job").
-define(CUTTLEFISH_KEY(App, Op),
io_lib:format(?CUTTLEFISH_PREFIX ".~s.~s", [App, Op])).
-define(TOKEN_LIST_BUCKETS, {riak_kv, list_buckets}).
-define(TOKEN_LIST_BUCKETS_S, {riak_kv, stream_list_buckets}).
-define(TOKEN_LIST_KEYS, {riak_kv, list_keys}).
-define(TOKEN_LIST_KEYS_S, {riak_kv, stream_list_keys}).
-define(TOKEN_MAP_REDUCE, {riak_kv, map_reduce}).
-define(TOKEN_MAP_REDUCE_JS, {riak_kv, map_reduce_js}).
-define(TOKEN_SEC_INDEX, {riak_kv, secondary_index}).
-define(TOKEN_SEC_INDEX_S, {riak_kv, stream_secondary_index}).
-define(TOKEN_OLD_SEARCH, {riak_search, query}).
-define(TOKEN_YZ_SEARCH, {yokozuna, query}).
% Defaults for Riak 2.2
% 'true' == 'enabled', 'false' == 'disabled'
-define(JOB_CLASS_DEFAULTS, [
{?TOKEN_LIST_BUCKETS, true},
{?TOKEN_LIST_BUCKETS_S, true},
{?TOKEN_LIST_KEYS, true},
{?TOKEN_LIST_KEYS_S, true},
{?TOKEN_MAP_REDUCE, true},
{?TOKEN_MAP_REDUCE_JS, true},
{?TOKEN_SEC_INDEX, true},
{?TOKEN_SEC_INDEX_S, true},
{?TOKEN_OLD_SEARCH, true},
{?TOKEN_YZ_SEARCH, true}
]).
-define(COMMON_CONFIG, [
{"storage_backend", "leveldb"}, % required by ?TOKEN_SEC_INDEX
{"search", "on"} % required by ?TOKEN_YZ_SEARCH
]).
-define(ERRMSG_BIN(Tok), riak_core_util:job_class_disabled_message(binary, Tok)).
-define(ERRMSG_TXT(Tok), riak_core_util:job_class_disabled_message(text, Tok)).
-endif. % job_enable_common_included

View File

@ -0,0 +1,61 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(kv_vnode_requests_upgrade_downgrade).
-include_lib("eunit/include/eunit.hrl").
-export([confirm/0]).
-define(NUM_KEYS, 100).
-define(BUCKET, <<"ale">>).
-define(CLUSTER_SIZE, 5).
-define(CONFIG, []).
confirm() ->
Cluster = [Node| _ ] = rt:build_cluster(lists:duplicate(?CLUSTER_SIZE, {lts, ?CONFIG})),
Clients = [rt:pbc(N) || N <- Cluster],
lager:info("Writing ~p keys", [?NUM_KEYS]),
rt:systest_write(Node, ?NUM_KEYS, ?BUCKET),
Before = count_keys(Clients, ?BUCKET),
ExpectedCounts = lists:duplicate(?CLUSTER_SIZE, ?NUM_KEYS),
?assertEqual(Before, ExpectedCounts),
perform_upgrade(Cluster, current, 3),
After = count_keys(Clients, ?BUCKET),
?assertEqual(Before, After),
pass.
count_keys(Clients, Bucket) when is_list(Clients) ->
[count_keys(Client, Bucket) || Client <- Clients];
count_keys(Client, Bucket) ->
{ok, Keys} = riakc_pb_socket:list_keys(Client, Bucket, 5000),
length(Keys).
perform_upgrade(Node, Version) ->
lager:info("Upgrading node ~p", [Node]),
rt:upgrade(Node, Version),
lager:info("Upgrade finished on node ~p", [Node]),
rt:wait_for_service(Node, riak_kv).
perform_upgrade(Cluster, Version, TakeN) ->
lists:foreach(fun(Node) -> perform_upgrade(Node, Version) end,
lists:sublist(Cluster, TakeN)).

View File

@ -30,13 +30,13 @@
-define(LIST_KEYS_RETRIES, 1000).
-define(GET_RETRIES, 1000).
-define(BUCKET, <<"test">>).
-define(KEY, <<"hotkey">>).
-define(VALUE, <<"overload_test_value">>).
-define(NORMAL_TYPE, <<"normal_type">>).
-define(CONSISTENT_TYPE, <<"consistent_type">>).
-define(WRITE_ONCE_TYPE, <<"write_once_type">>).
-define(NORMAL_BKV, {{?NORMAL_TYPE, ?BUCKET}, ?KEY, <<"test">>}).
-define(CONSISTENT_BKV, {{?CONSISTENT_TYPE, ?BUCKET}, ?KEY, <<"test">>}).
-define(WRITE_ONCE_BKV, {{?WRITE_ONCE_TYPE, ?BUCKET}, ?KEY, <<"test">>}).
-define(NORMAL_BUCKET, {?NORMAL_TYPE, ?BUCKET}).
-define(CONSISTENT_BUCKET, {?CONSISTENT_TYPE, ?BUCKET}).
-define(WRITE_ONCE_BUCKET, {?WRITE_ONCE_TYPE, ?BUCKET}).
%% This record contains the default values for config settings if they were not set
%% in the advanced.config file - because setting something to `undefined` is not the same
@ -81,18 +81,22 @@ default_config(#config{
{riak_api, [{pb_backlog, 1024}]}].
confirm() ->
Nodes = setup(),
[Node1 | _] = Nodes = setup(),
ok = create_bucket_type(Nodes, ?NORMAL_TYPE, [{n_val, 3}]),
ok = create_bucket_type(Nodes, ?CONSISTENT_TYPE, [{consistent, true}, {n_val, 5}]),
ok = create_bucket_type(Nodes, ?WRITE_ONCE_TYPE, [{write_once, true}, {n_val, 1}]),
rt:wait_until(ring_manager_check_fun(hd(Nodes))),
Key = generate_key(),
lager:info("Generated overload test key ~p", [Key]),
Node1 = hd(Nodes),
write_once(Node1, ?NORMAL_BKV),
write_once(Node1, ?CONSISTENT_BKV),
write_once(Node1, ?WRITE_ONCE_BKV),
NormalBKV = {?NORMAL_BUCKET, Key, ?VALUE},
ConsistentBKV = {?CONSISTENT_BUCKET, Key, ?VALUE},
WriteOnceBKV = {?WRITE_ONCE_BUCKET, Key, ?VALUE},
write_once(Node1, NormalBKV),
write_once(Node1, ConsistentBKV),
write_once(Node1, WriteOnceBKV),
Tests = [test_no_overload_protection,
test_vnode_protection,
@ -102,33 +106,41 @@ confirm() ->
lager:info("Starting Test ~p for ~p~n", [Test, BKV]),
ok = erlang:apply(?MODULE, Test, [Nodes, BKV])
end || Test <- Tests,
BKV <- [?NORMAL_BKV,
?CONSISTENT_BKV,
?WRITE_ONCE_BKV]],
BKV <- [NormalBKV,
ConsistentBKV,
WriteOnceBKV]],
%% Test cover queries doesn't depend on bucket/keyvalue, just run it once
test_cover_queries_overload(Nodes),
pass.
generate_key() ->
random:seed(erlang:now()),
N = random:uniform(500),
Part1 = <<"overload_test_key_">>,
Part2 = integer_to_binary(N),
<<Part1/binary, Part2/binary>>.
setup() ->
ensemble_util:build_cluster(5, default_config(), 5).
test_no_overload_protection(_Nodes, ?CONSISTENT_BKV) ->
test_no_overload_protection(_Nodes, {?CONSISTENT_BUCKET, _, _}) ->
ok;
test_no_overload_protection(Nodes, BKV) ->
lager:info("Setting default configuration for no overload protestion test."),
lager:info("Setting default configuration for no overload protection test."),
rt:pmap(fun(Node) ->
rt:update_app_config(Node, default_config())
end, Nodes),
lager:info("Testing with no overload protection"),
ProcFun = build_predicate_eq(test_no_overload_protection, ?NUM_REQUESTS,
"ProcFun", "Procs"),
QueueFun = build_predicate_gte(test_no_overload_protection, ?NUM_REQUESTS,
"QueueFun", "Queue Size"),
QueueFun = build_predicate_eq(test_no_overload_protection, ?NUM_REQUESTS,
"QueueFun", "Queue Size"),
verify_test_results(run_test(Nodes, BKV), BKV, ProcFun, QueueFun).
verify_test_results({_NumProcs, QueueLen}, ?CONSISTENT_BKV, _ProcFun, QueueFun) ->
verify_test_results({_NumProcs, QueueLen}, {?CONSISTENT_BUCKET, _, _}, _ProcFun, QueueFun) ->
?assert(QueueFun(QueueLen));
verify_test_results({NumProcs, QueueLen}, _BKV, ProcFun, QueueFun) ->
?assert(ProcFun(NumProcs)),
@ -171,11 +183,11 @@ test_vnode_protection(Nodes, BKV) ->
%% Don't check consistent gets, as they don't use the FSM
test_fsm_protection(_, ?CONSISTENT_BKV) ->
test_fsm_protection(_, {?CONSISTENT_BUCKET, _, _}) ->
ok;
%% Don't check on fast path either.
test_fsm_protection(_, ?WRITE_ONCE_BKV) ->
test_fsm_protection(_, {?WRITE_ONCE_BUCKET, _, _}) ->
ok;
test_fsm_protection(Nodes, BKV) ->
@ -263,7 +275,7 @@ run_test(Nodes, BKV) ->
timer:sleep(5000),
rt:load_modules_on_nodes([?MODULE], Nodes),
overload_proxy:start_link(),
rt_intercept:add(Node1, {riak_kv_get_fsm, [{{start_link, 4}, count_start_link_4}]}),
rt_intercept:add(Node1, {riak_kv_get_fsm, [{{start, 4}, count_start_4}]}),
Victim = get_victim(Node1, BKV),
lager:info("Suspending vnode ~p/~p",
@ -282,22 +294,22 @@ run_test(Nodes, BKV) ->
NumProcs2 = overload_proxy:get_count(),
lager:info("Final process count on ~p: ~b", [Node1, NumProcs2]),
QueueLen = vnode_queue_len(Victim),
QueueLen = vnode_gets_in_queue(Victim),
lager:info("Final vnode queue length for ~p: ~b",
[Victim, QueueLen]),
resume_vnode(Suspended),
rt:wait_until(fun() ->
vnode_queue_len(Victim) =:= 0
vnode_gets_in_queue(Victim) =:= 0
end),
kill_pids(Reads),
overload_proxy:stop(),
{NumProcs2 - NumProcs1, QueueLen}.
get_victim(ExcludeNode, {Bucket, Key, _}) ->
get_victim(Node, {Bucket, Key, _}) ->
Hash = riak_core_util:chash_std_keyfun({Bucket, Key}),
PL = lists:sublist(riak_core_ring:preflist(Hash, rt:get_ring(ExcludeNode)), 5),
hd([IdxNode || {_, Node}=IdxNode <- PL, Node /= ExcludeNode]).
PL = riak_core_ring:preflist(Hash, rt:get_ring(Node)),
hd(PL).
ring_manager_check_fun(Node) ->
fun() ->
@ -366,7 +378,7 @@ read_until_success(Node) ->
read_until_success(C, 0).
read_until_success(C, Count) ->
case C:get(?BUCKET, ?KEY) of
case C:get(<<"dummy">>, <<"dummy">>) of
{error, mailbox_overload} ->
read_until_success(C, Count+1);
_ ->
@ -532,11 +544,11 @@ resume_vnode(Pid) ->
process_count(Node) ->
rpc:call(Node, erlang, system_info, [process_count]).
vnode_queue_len({Idx, Node}) ->
vnode_queue_len(Node, Idx).
vnode_gets_in_queue({Idx, Node}) ->
vnode_gets_in_queue(Node, Idx).
vnode_queue_len(Node, Idx) ->
rpc:call(Node, ?MODULE, remote_vnode_queue, [Idx]).
vnode_gets_in_queue(Node, Idx) ->
rpc:call(Node, ?MODULE, remote_vnode_gets_in_queue, [Idx]).
dropped_stat(Node) ->
Stats = rpc:call(Node, riak_core_stat, get_stats, []),
@ -551,12 +563,6 @@ run_count(Node) ->
lager:info("fsm count:~p", [get_num_running_gen_fsm(Node)]),
run_count(Node).
run_queue_len({Idx, Node}) ->
timer:sleep(500),
Len = vnode_queue_len(Node, Idx),
lager:info("queue len on ~p is:~p", [Node, Len]),
run_queue_len({Idx, Node}).
get_num_running_gen_fsm(Node) ->
Procs = rpc:call(Node, erlang, processes, []),
ProcInfo = [ rpc:call(Node, erlang, process_info, [P]) || P <- Procs, P /= undefined ],
@ -565,13 +571,27 @@ get_num_running_gen_fsm(Node) ->
FsmList = [ proplists:lookup(riak_kv_get_fsm, Call) || Call <- InitCalls ],
length(proplists:lookup_all(riak_kv_get_fsm, FsmList)).
remote_vnode_queue(Idx) ->
remote_vnode_gets_in_queue(Idx) ->
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
{messages, AllMessages} = process_info(Pid, messages),
NonPings = lists:filter(fun({'$vnode_proxy_ping', _, _, _}) -> false; (_) -> true end, AllMessages),
Len = length(NonPings),
lager:info("Non-Ping Messages (~p): ~n~p~n", [Len, NonPings]),
Len.
GetMessages = lists:foldl(fun(E, A) ->
case is_get_req(E) of
true -> A + 1;
false -> A
end
end, 0, AllMessages),
lager:info("Get requests (~p): ~p", [Idx, GetMessages]),
GetMessages.
%% This is not the greatest thing ever, since we're coupling this test pretty
%% tightly to the internal representation of get requests in riak_kv...can't
%% really figure out a better way to do this, though.
is_get_req({'$gen_event', {riak_vnode_req_v1, _, _, Req}}) ->
element(1, Req) =:= riak_kv_get_req_v1;
is_get_req(_) ->
false.
%% In tests that do not expect work to be shed, we want to confirm that
%% at least ?NUM_REQUESTS (queue entries) are handled.

View File

@ -128,7 +128,14 @@ kill_repair_verify({Partition, Node}, DataSuffix, Service) ->
[Partition, VNodeName]),
?assert(rpc:call(Node, erlang, exit, [Pid, kill_for_test])),
rt:wait_until(Node, fun(N) -> not(rpc:call(N, erlang, is_process_alive, [Pid])) end),
%% We used to wait for the old pid to die here, but there is a delay between
%% the vnode process dying and a new one being registered with the vnode
%% manager. If we don't wait for the manager to return a new vnode pid, it's
%% possible for the test to fail with a gen_server:call timeout.
rt:wait_until(fun() -> {ok, Pid} =/=
rpc:call(Node, riak_core_vnode_manager, get_vnode_pid,
[Partition, VNodeName])
end),
lager:info("Verify data is missing"),
?assertEqual(0, count_data(Service, {Partition, Node})),

View File

@ -58,7 +58,8 @@ confirm() ->
{certfile, filename:join([CertDir,"site3.basho.com/cert.pem"])},
{keyfile, filename:join([CertDir, "site3.basho.com/key.pem"])},
{cacertfile, filename:join([CertDir, "site3.basho.com/cacerts.pem"])}
]}
]},
{job_accept_class, undefined}
]},
{riak_search, [
{enabled, true}

View File

@ -542,7 +542,7 @@ validate_intercepted_fullsync(InterceptTarget,
%% not_built to defer fullsync process.
validate_intercepted_fullsync(InterceptTarget,
{riak_kv_index_hashtree,
[{{get_lock, 2}, not_built}]},
[{{get_lock, 4}, not_built}]},
ReplicationLeader,
ReplicationCluster,
NumIndicies),
@ -551,7 +551,16 @@ validate_intercepted_fullsync(InterceptTarget,
%% already_locked to defer fullsync process.
validate_intercepted_fullsync(InterceptTarget,
{riak_kv_index_hashtree,
[{{get_lock, 2}, already_locked}]},
[{{get_lock, 4}, already_locked}]},
ReplicationLeader,
ReplicationCluster,
NumIndicies),
%% Before enabling fullsync, ensure trees on one source node return
%% bad_version to defer fullsync process.
validate_intercepted_fullsync(InterceptTarget,
{riak_kv_index_hashtree,
[{{get_lock, 4}, bad_version}]},
ReplicationLeader,
ReplicationCluster,
NumIndicies),

View File

@ -74,7 +74,7 @@ confirm() ->
ok = lists:foreach(fun(Node) ->
lager:info("Upgrade node: ~p", [Node]),
rt:log_to_nodes(Nodes, "Upgrade node: ~p", [Node]),
rtdev:upgrade(Node, current),
rt:upgrade(Node, current),
%% The upgrade did a wait for pingable
rt:wait_for_service(Node, [riak_kv, riak_pipe, riak_repl]),
[rt:wait_until_ring_converged(N) || N <- [ANodes, BNodes]],

View File

@ -34,7 +34,15 @@
[
{anti_entropy, {on, []}},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 100}
{anti_entropy_concurrency, 100},
%% In mixed clusters, don't allow the object has version
%% to upgrade from `legacy` as the replication will no
%% no longer be able to complete
{override_capability,
[{object_hash_version,
[{use, legacy},
{prefer, legacy}]
}]}
]
},
{riak_repl,
@ -182,6 +190,7 @@ verify_replication(AVersion, BVersion, Start, End, Realtime) ->
configure_clusters(AVersion, BVersion, Realtime) ->
rt:set_advanced_conf(all, ?CONF(infinity)),
rt:copy_conf(6, previous, current),
Nodes = [ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),

View File

@ -66,7 +66,7 @@ confirm() ->
ok = lists:foreach(fun(Node) ->
lager:info("Upgrade node: ~p", [Node]),
rt:log_to_nodes(Nodes, "Upgrade node: ~p", [Node]),
rtdev:upgrade(Node, current),
rt:upgrade(Node, current),
rt:wait_until_pingable(Node),
rt:wait_for_service(Node, [riak_kv, riak_pipe, riak_repl]),
[rt:wait_until_ring_converged(N) || N <- [ANodes, BNodes]],

View File

@ -50,6 +50,11 @@ confirm() ->
%% typed, and custom typed buckets are as expected.
-spec verify_default_bucket_props(node(), binary()) -> ok | no_return().
verify_default_bucket_props(Node, Type) ->
%% Once in a blue moon we'll try to create a bucket type before
%% all capabilities have been registered. Bucket type creation
%% fails if this one isn't present yet.
rt:wait_until_capability(Node, {riak_core,bucket_types}, true),
rt:create_and_activate_bucket_type(Node, Type, [{nonsense, <<"value">>}]),
DefProps = get_props(Node, <<"default">>),

View File

@ -52,15 +52,17 @@ ensure_ack_tests(Nodes) ->
%% verify data is replicated to B
lager:info("Reading 1 key written from ~p", [LeaderB]),
?assertEqual(0, repl_util:wait_for_reads(LeaderB, 1, 1, TestBucket, 2)),
lager:info("Checking unacked on ~p", [LeaderA]),
?assertEqual(ok, rt:wait_until(fun () -> check_unacked(LeaderA) end)).
check_unacked(LeaderA) ->
RTQStatus = rpc:call(LeaderA, riak_repl2_rtq, status, []),
Consumers = proplists:get_value(consumers, RTQStatus),
case proplists:get_value("B", Consumers) of
undefined ->
[];
missing_consumer;
Consumer ->
Unacked = proplists:get_value(unacked, Consumer, 0),
lager:info("unacked: ~p", [Unacked]),
?assertEqual(0, Unacked)
lager:info("unacked: ~p", [Unacked]),
Unacked == 0
end.

217
tests/test_hll.erl Normal file
View File

@ -0,0 +1,217 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% @doc r_t to test hll datatypes across a riak cluster
-module(test_hll).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(HLL_TYPE1, <<"hlls1">>).
-define(HLL_TYPE2, <<"hlls2">>).
-define(HLL_TYPE3, <<"hlls3">>).
-define(BUCKET1, {?HLL_TYPE1, <<"testbucket1">>}).
-define(BUCKET2, {?HLL_TYPE2, <<"testbucket2">>}).
-define(DEFAULT_P, 14).
-define(SET_P, 16).
-define(BAD_P, 1).
-define(P_SETTING, hll_precision).
-define(KEY, <<"flipit&reverseit">>).
-define(CONFIG,
[
{riak_core,
[{ring_creation_size, 8},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8},
{handoff_concurrency, 16}]
}
]).
confirm() ->
%% Configure cluster.
Nodes = [N1, N2, N3, N4] = rt:build_cluster(4, ?CONFIG),
NodeRand1A = rt:select_random([N1, N2]),
NodeRand1B = rt:select_random([N1, N2]),
NodeRand2A = rt:select_random([N3, N4]),
NodeRand2B = rt:select_random([N3, N4]),
lager:info("Create PB/HTTP Clients from first two nodes, and then"
" the second two nodes, as we'll partition Nodes 1 & 2 from"
" Nodes 3 & 4 later"),
%% Create PB connection.
PBC1 = rt:pbc(NodeRand1A),
PBC2 = rt:pbc(NodeRand2A),
riakc_pb_socket:set_options(PBC1, [queue_if_disconnected]),
riakc_pb_socket:set_options(PBC2, [queue_if_disconnected]),
%% Create HTTPC connection.
HttpC1 = rt:httpc(NodeRand1B),
HttpC2 = rt:httpc(NodeRand2B),
ok = rt:create_activate_and_wait_for_bucket_type(Nodes,
?HLL_TYPE1,
[{datatype, hll},
{?P_SETTING, ?SET_P}]),
ok = rt:create_activate_and_wait_for_bucket_type(Nodes,
?HLL_TYPE2,
[{datatype, hll}]),
lager:info("Create a bucket-type w/ a HLL datatype and a bad HLL precision"
" - This should throw an error"),
?assertError({badmatch, {error, [{hll_precision, _}]}},
rt:create_activate_and_wait_for_bucket_type(Nodes,
?HLL_TYPE3,
[{datatype, hll},
{?P_SETTING,
?BAD_P}])),
pb_tests(PBC1, PBC2, riakc_pb_socket, ?BUCKET1, Nodes),
http_tests(HttpC1, HttpC2, rhc, ?BUCKET2, Nodes),
%% Stop PB connections.
riakc_pb_socket:stop(PBC1),
riakc_pb_socket:stop(PBC2),
pass.
http_tests(C1, C2, CMod, Bucket, Nodes) ->
lager:info("HTTP CLI TESTS: Create new Hll DT"),
add_tests(C1, CMod, Bucket),
HllSet0 = get_hll(C1, CMod, Bucket),
check_precision_and_reduce_test(C1, CMod, Bucket, ?DEFAULT_P, HllSet0),
partition_write_heal(C1, C2, CMod, Bucket, Nodes),
HllSet1 = get_hll(C1, CMod, Bucket),
check_precision_and_reduce_invalid_test(C1, CMod, Bucket, ?DEFAULT_P - 1,
HllSet1),
ok.
pb_tests(C1, C2, CMod, Bucket, Nodes) ->
lager:info("PB CLI TESTS: Create new Hll DT"),
add_tests(C1, CMod, Bucket),
HllSet0 = get_hll(C1, CMod, Bucket),
check_precision_and_reduce_test(C1, CMod, Bucket, ?SET_P, HllSet0),
partition_write_heal(C1, C2, CMod, Bucket, Nodes),
HllSet1 = get_hll(C1, CMod, Bucket),
check_precision_and_reduce_invalid_test(C1, CMod, Bucket, ?SET_P - 1, HllSet1),
ok.
add_tests(C, CMod, Bucket) ->
S0 = riakc_hll:new(),
add_element(C, CMod, Bucket, S0, <<"OH">>),
{ok, S1} = CMod:fetch_type(C, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S1), 1),
add_elements(C, CMod, Bucket, S1, [<<"C">>, <<"A">>, <<"P">>]),
{ok, S2} = CMod:fetch_type(C, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S2), 4),
add_redundant_element(C, CMod, Bucket, S2, <<"OH">>),
{ok, S3} = CMod:fetch_type(C, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S3), 4).
partition_write_heal(C1, C2, CMod, Bucket, Nodes) ->
lager:info("Partition cluster in two to force merge."),
[N1, N2, N3, N4] = Nodes,
PartInfo = rt:partition([N1, N2], [N3, N4]),
try
lager:info("Write to one side of the partition"),
{ok, S0} = CMod:fetch_type(C1, Bucket, ?KEY),
add_element(C1, CMod, Bucket, S0, <<"OH hello there">>),
{ok, S1} = CMod:fetch_type(C1, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S1), 5),
lager:info("Write to the other side of the partition"),
{ok, S2} = CMod:fetch_type(C2, Bucket, ?KEY),
add_element(C2, CMod, Bucket, S2, <<"Riak 1.4.eva">>),
{ok, S3} = CMod:fetch_type(C2, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S3), 5),
lager:info("Heal")
after
ok = rt:heal(PartInfo)
end,
ok = rt:wait_until_no_pending_changes(Nodes),
ok = rt:wait_until_transfers_complete(Nodes),
lager:info("Once healed, check both sides for the correct, merged value"),
{ok, S4} = CMod:fetch_type(C1, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S4), 6),
{ok, S5} = CMod:fetch_type(C2, Bucket, ?KEY),
?assertEqual(riakc_hll:value(S5), 6).
get_hll(C, CMod, Bucket) ->
{ok, Obj} =CMod:get(C, Bucket, ?KEY),
{ok, CRDT} = riak_kv_crdt:from_binary(riakc_obj:get_value(Obj)),
{_, _, _, HllSet} = CRDT,
HllSet.
add_element(C, CMod, Bucket, S, Elem) ->
lager:info("Add element to HLL DT"),
CMod:update_type(
C, Bucket, ?KEY,
riakc_hll:to_op(
riakc_hll:add_element(Elem, S))).
add_elements(C, CMod, Bucket, S, Elems) ->
lager:info("Add multiple elements to HLL DT"),
CMod:update_type(
C, Bucket, ?KEY,
riakc_hll:to_op(
riakc_hll:add_elements(Elems, S))).
add_redundant_element(C, CMod, Bucket, S, Elem) ->
lager:info("Add redundant element to HLL DT by calling"
" add_element/3 again"),
add_element(C, CMod, Bucket, S, Elem).
check_precision_and_reduce_test(C, CMod, Bucket, ExpP, HllSet) ->
{ok, Props0} = CMod:get_bucket(C, Bucket),
?assertEqual(proplists:get_value(?P_SETTING, Props0), ExpP),
?assertEqual(riak_kv_hll:precision(HllSet), ExpP),
ok = CMod:set_bucket(C, Bucket, [{?P_SETTING, ExpP - 1}]),
{ok, Props1} = CMod:get_bucket(C, Bucket),
?assertEqual(proplists:get_value(?P_SETTING, Props1), ExpP - 1).
check_precision_and_reduce_invalid_test(C, CMod, Bucket, ExpP, HllSet) ->
lager:info("HLL's can be reduced, but never increased.\n"
" Test to make sure we don't allow invalid values."),
?assertEqual(riak_kv_hll:precision(HllSet), ExpP),
{error, _} = CMod:set_bucket(C, Bucket, [{?P_SETTING, ExpP + 1}]),
{ok, Props} = CMod:get_bucket(C, Bucket),
?assertEqual(proplists:get_value(?P_SETTING, Props), ExpP),
?assertEqual(riak_kv_hll:precision(HllSet), ExpP).

View File

@ -161,8 +161,7 @@ check_kill_repair(Node1) ->
lager:info("Repair was forcibly killed");
user_request ->
lager:info("Repair exited gracefully, we should be able to "
"trigger another repair immediately"),
normal = run_2i_repair(Node1)
"trigger another repair immediately")
end,
pass.

View File

@ -43,6 +43,7 @@
% I would hope this would come from the testing framework some day
% to use the test in small and large scenarios.
-define(DEFAULT_RING_SIZE, 8).
-define(AAE_THROTTLE_LIMITS, [{-1, 0}, {100, 10}]).
-define(CFG,
[{riak_kv,
[
@ -51,26 +52,47 @@
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 100},
{anti_entropy_expire, 24 * 60 * 60 * 1000}, % Not for now!
{anti_entropy_tick, 500}
{anti_entropy_tick, 500},
{aae_throttle_limits, ?AAE_THROTTLE_LIMITS}
]},
{riak_core,
[
{ring_creation_size, ?DEFAULT_RING_SIZE}
]}]
).
-define(NUM_NODES, 1).
-define(NUM_NODES, 3).
-define(NUM_KEYS, 1000).
-define(BUCKET, <<"test_bucket">>).
-define(N_VAL, 3).
confirm() ->
Nodes = rt:build_cluster(?NUM_NODES, ?CFG),
verify_throttle_config(Nodes),
verify_aae(Nodes),
pass.
verify_throttle_config(Nodes) ->
lists:foreach(
fun(Node) ->
?assert(rpc:call(Node,
riak_kv_entropy_manager,
is_aae_throttle_enabled,
[])),
?assertMatch(?AAE_THROTTLE_LIMITS,
rpc:call(Node,
riak_kv_entropy_manager,
get_aae_throttle_limits,
[]))
end,
Nodes).
verify_aae(Nodes) ->
Node1 = hd(Nodes),
% First, recovery without tree rebuilds
% Verify that AAE eventually upgrades to version 0(or already has)
wait_until_hashtree_upgrade(Nodes),
% Recovery without tree rebuilds
% Test recovery from to few replicas written
KV1 = test_data(1, 1000),
@ -83,7 +105,7 @@ verify_aae(Nodes) ->
lager:info("Run similar tests now with tree rebuilds enabled"),
start_tree_rebuilds(Nodes),
% Test recovery from to few replicas written
% Test recovery from too few replicas written
KV3 = test_data(1001, 2000),
test_less_than_n_writes(Node1, KV3),
@ -302,3 +324,26 @@ max_aae_repairs(Node) when is_atom(Node) ->
LastCounts = [Last || {_, _, _, {Last, _, _, _}} <- Info],
MaxCount = lists:max(LastCounts),
MaxCount.
wait_until_hashtree_upgrade(Nodes) ->
lager:info("Verifying AAE hashtrees eventually all upgrade to version 0"),
rt:wait_until(fun() -> all_hashtrees_upgraded(Nodes) end).
all_hashtrees_upgraded(Nodes) when is_list(Nodes) ->
[Check|_] = lists:usort([all_hashtrees_upgraded(Node) || Node <- Nodes]),
Check;
all_hashtrees_upgraded(Node) when is_atom(Node) ->
case rpc:call(Node, riak_kv_entropy_manager, get_version, []) of
0 ->
Trees = rpc:call(Node, riak_kv_entropy_manager, get_trees_version, []),
case [Idx || {Idx, undefined} <- Trees] of
[] ->
true;
_ ->
false
end;
_ ->
false
end.

View File

@ -51,12 +51,10 @@ confirm() ->
%% let the repl flow
repl_power_activate(ClusterA, ClusterB),
AValue = get_counter(hd(ClusterA)),
BValue = get_counter(hd(ClusterB)),
ExpectedValve = AExpected + BExpected,
ExpectedValue = AExpected + BExpected,
?assertEqual(ExpectedValve, AValue),
?assertEqual(ExpectedValve, BValue),
?assertEqual(ok, rt:wait_until(fun() -> verify_counter(ExpectedValue, hd(ClusterA)) end)),
?assertEqual(ok, rt:wait_until(fun() -> verify_counter(ExpectedValue, hd(ClusterB)) end)),
pass.
make_clusters() ->
@ -87,6 +85,9 @@ get_counter({Client, _Node}) ->
{ok, Val} = rhc:counter_val(Client, ?BUCKET, ?KEY),
Val.
verify_counter(ExpectedValue, Node) ->
ExpectedValue == get_counter(Node).
rand_amt() ->
crypto:rand_uniform(-100, 100).

View File

@ -63,8 +63,14 @@ confirm() ->
lager:info("Upgrayded!!"),
?assertEqual(ok, rt:wait_until_ready(Current)),
?assertEqual(ok, rt:wait_until_ready(Previous)),
rt:wait_for_service(Previous, riak_kv),
?assertEqual(ok, rt:wait_until_capability_contains(Current, {riak_kv, crdt}, [riak_dt_pncounter, riak_dt_orswot, riak_dt_map, pncounter])),
rt:wait_for_service(Previous, riak_kv),
?assertEqual(ok, rt:wait_until_capability_contains(Current, {riak_kv, crdt},
[riak_dt_pncounter,
riak_dt_orswot,
riak_dt_map,
pncounter,
riak_kv_hll,
riak_dt_gset])),
?assertMatch(ok, rhc:counter_incr(PrevHttp, ?BUCKET, ?KEY, 1)),
?assertMatch({ok, 5}, rhc:counter_val(PrevHttp, ?BUCKET, ?KEY)),
@ -86,7 +92,7 @@ confirm() ->
?assertEqual(8, begin
{ok, Counter} = riakc_pb_socket:fetch_type(PrevPB1, {<<"default">>, ?BUCKET}, ?KEY),
riakc_counter:value(Counter)
end),
end),
?assertEqual(ok, riakc_pb_socket:update_type(PrevPB1, {<<"default">>, ?BUCKET}, ?KEY, gen_counter_op())),
?assertEqual({ok, 9}, riakc_pb_socket:counter_val(PB, ?BUCKET, ?KEY)),

View File

@ -32,9 +32,13 @@
-define(CTYPE, <<"counters">>).
-define(STYPE, <<"sets">>).
-define(MTYPE, <<"maps">>).
-define(HTYPE, <<"hlls">>).
-define(GSTYPE, <<"gsets">>).
-define(TYPES, [{?CTYPE, counter},
{?STYPE, set},
{?MTYPE, map}]).
{?MTYPE, map},
{?HTYPE, hll},
{?GSTYPE, gset}]).
-define(PB_BUCKET, <<"pbtest">>).
-define(HTTP_BUCKET, <<"httptest">>).
@ -58,17 +62,17 @@ confirm() ->
%% Do some updates to each type
[update_1(Type, ?PB_BUCKET, Client, riakc_pb_socket) ||
{Type, Client} <- lists:zip(?TYPES, [P1, P2, P3])],
{Type, Client} <- lists:zip(?TYPES, [P1, P2, P3, P4, P4])],
[update_1(Type, ?HTTP_BUCKET, Client, rhc) ||
{Type, Client} <- lists:zip(?TYPES, [H1, H2, H3])],
{Type, Client} <- lists:zip(?TYPES, [H1, H2, H3, H4, H3])],
%% Check that the updates are stored
[check_1(Type, ?PB_BUCKET, Client, riakc_pb_socket) ||
{Type, Client} <- lists:zip(?TYPES, [P4, P3, P2])],
{Type, Client} <- lists:zip(?TYPES, [P4, P3, P2, P1, P2])],
[check_1(Type, ?HTTP_BUCKET, Client, rhc) ||
{Type, Client} <- lists:zip(?TYPES, [H4, H3, H2])],
{Type, Client} <- lists:zip(?TYPES, [H4, H3, H2, H1, H4])],
lager:info("Partition cluster in two."),
@ -77,34 +81,34 @@ confirm() ->
lager:info("Modify data on side 1"),
%% Modify one side
[update_2a(Type, ?PB_BUCKET, Client, riakc_pb_socket) ||
{Type, Client} <- lists:zip(?TYPES, [P1, P2, P1])],
{Type, Client} <- lists:zip(?TYPES, [P1, P2, P1, P2, P1])],
[update_2a(Type, ?HTTP_BUCKET, Client, rhc) ||
{Type, Client} <- lists:zip(?TYPES, [H1, H2, H1])],
{Type, Client} <- lists:zip(?TYPES, [H1, H2, H1, H2, H1])],
lager:info("Check data is unmodified on side 2"),
%% check value on one side is different from other
[check_2b(Type, ?PB_BUCKET, Client, riakc_pb_socket) ||
{Type, Client} <- lists:zip(?TYPES, [P4, P3, P4])],
{Type, Client} <- lists:zip(?TYPES, [P4, P3, P4, P3, P4])],
[check_2b(Type, ?HTTP_BUCKET, Client, rhc) ||
{Type, Client} <- lists:zip(?TYPES, [H4, H3, H4])],
{Type, Client} <- lists:zip(?TYPES, [H4, H3, H4, H3, H4])],
lager:info("Modify data on side 2"),
%% Modify other side
[update_3b(Type, ?PB_BUCKET, Client, riakc_pb_socket) ||
{Type, Client} <- lists:zip(?TYPES, [P3, P4, P3])],
{Type, Client} <- lists:zip(?TYPES, [P3, P4, P3, P4, P3])],
[update_3b(Type, ?HTTP_BUCKET, Client, rhc) ||
{Type, Client} <- lists:zip(?TYPES, [H3, H4, H3])],
{Type, Client} <- lists:zip(?TYPES, [H3, H4, H3, H4, H3])],
lager:info("Check data is unmodified on side 1"),
%% verify values differ
[check_3a(Type, ?PB_BUCKET, Client, riakc_pb_socket) ||
{Type, Client} <- lists:zip(?TYPES, [P2, P2, P1])],
{Type, Client} <- lists:zip(?TYPES, [P2, P2, P1, P1, P2])],
[check_3a(Type, ?HTTP_BUCKET, Client, rhc) ||
{Type, Client} <- lists:zip(?TYPES, [H2, H2, H1])],
{Type, Client} <- lists:zip(?TYPES, [H2, H2, H1, H1, H2])],
%% heal
lager:info("Heal and check merged values"),
@ -138,7 +142,7 @@ create_bucket_types([N1|_]=Nodes, Types) ->
[Name, [{datatype, Type}, {allow_mult, true}]]) ||
{Name, Type} <- Types ],
[rt:wait_until(N1, bucket_type_ready_fun(Name)) || {Name, _Type} <- Types],
[ rt:wait_until(N, bucket_type_matches_fun(Types)) || N <- Nodes].
[rt:wait_until(N, bucket_type_matches_fun(Types)) || N <- Nodes].
bucket_type_ready_fun(Name) ->
fun(Node) ->
@ -190,6 +194,20 @@ update_1({BType, map}, Bucket, Client, CMod) ->
riakc_counter:increment(10, C)
end, M1)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS);
update_1({BType, hll}, Bucket, Client, CMod) ->
lager:info("update_1: Updating hyperloglog(set)"),
CMod:modify_type(Client,
fun(S) ->
riakc_hll:add_element(<<"Z">>, S)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS);
update_1({BType, gset}, Bucket, Client, CMod) ->
lager:info("update_1: Updating hyperloglog(set)"),
CMod:modify_type(Client,
fun(S) ->
riakc_gset:add_element(<<"Z">>, S)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS).
check_1({BType, counter}, Bucket, Client, CMod) ->
@ -202,7 +220,13 @@ check_1({BType, map}, Bucket, Client, CMod) ->
lager:info("check_1: Checking map value is correct"),
check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_map,
[{{<<"followers">>, counter}, 10},
{{<<"friends">>, set}, [<<"Russell">>]}]).
{{<<"friends">>, set}, [<<"Russell">>]}]);
check_1({BType, hll}, Bucket, Client, CMod) ->
lager:info("check_1: Checking hll value is correct"),
check_value(Client,CMod,{BType, Bucket},?KEY,riakc_hll,1);
check_1({BType, gset}, Bucket, Client, CMod) ->
lager:info("check_1: Checking hll value is correct"),
check_value(Client,CMod,{BType, Bucket},?KEY,riakc_gset, [<<"Z">>]).
update_2a({BType, counter}, Bucket, Client, CMod) ->
CMod:modify_type(Client,
@ -231,8 +255,25 @@ update_2a({BType, map}, Bucket, Client, CMod) ->
end,
M1)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS);
update_2a({BType, hll}, Bucket, Client, CMod) ->
CMod:modify_type(Client,
fun(S) ->
riakc_hll:add_element(
<<"DANG">>,
riakc_hll:add_element(<<"Z^2">>, S))
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS);
update_2a({BType, gset}, Bucket, Client, CMod) ->
CMod:modify_type(Client,
fun(S) ->
riakc_gset:add_element(
<<"DANG">>,
riakc_gset:add_element(<<"Z^2">>, S))
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS).
check_2b({BType, counter}, Bucket, Client, CMod) ->
lager:info("check_2b: Checking counter value is unchanged"),
check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_counter, 5);
@ -243,7 +284,14 @@ check_2b({BType, map},Bucket,Client,CMod) ->
lager:info("check_2b: Checking map value is unchanged"),
check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_map,
[{{<<"followers">>, counter}, 10},
{{<<"friends">>, set}, [<<"Russell">>]}]).
{{<<"friends">>, set}, [<<"Russell">>]}]);
check_2b({BType, hll},Bucket,Client,CMod) ->
lager:info("check_2b: Checking hll value is unchanged"),
check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_hll, 1);
check_2b({BType, gset},Bucket,Client,CMod) ->
lager:info("check_2b: Checking gset value is unchanged"),
check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_gset, [<<"Z">>]).
update_3b({BType, counter}, Bucket, Client, CMod) ->
CMod:modify_type(Client,
@ -273,8 +321,21 @@ update_3b({BType, map},Bucket,Client,CMod) ->
end,
M1)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS);
update_3b({BType, hll}, Bucket, Client, CMod) ->
CMod:modify_type(Client,
fun(S) ->
riakc_hll:add_element(<<"Zedds Dead">>, S)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS);
update_3b({BType, gset}, Bucket, Client, CMod) ->
CMod:modify_type(Client,
fun(S) ->
riakc_gset:add_element(<<"Zedd's Dead">>, S)
end,
{BType, Bucket}, ?KEY, ?MODIFY_OPTS).
check_3a({BType, counter}, Bucket, Client, CMod) ->
lager:info("check_3a: Checking counter value is unchanged"),
check_value(Client,CMod,{BType, Bucket},?KEY,riakc_counter,-5);
@ -287,7 +348,14 @@ check_3a({BType, map}, Bucket, Client, CMod) ->
check_value(Client, CMod, {BType, Bucket}, ?KEY, riakc_map,
[{{<<"followers">>, counter}, 10},
{{<<"friends">>, set}, [<<"Russell">>, <<"Sam">>]},
{{<<"verified">>, flag}, true}]).
{{<<"verified">>, flag}, true}]);
check_3a({BType, hll}, Bucket, Client, CMod) ->
lager:info("check_3a: Checking hll value is unchanged"),
check_value(Client,CMod,{BType, Bucket},?KEY,riakc_hll,3);
check_3a({BType, gset}, Bucket, Client, CMod) ->
lager:info("check_3a: Checking gset value is unchanged"),
check_value(Client,CMod,{BType, Bucket},?KEY,riakc_gset, [<<"DANG">>,<<"Z">>,<<"Z^2">>]).
check_4({BType, counter}, Bucket, Client, CMod) ->
lager:info("check_4: Checking final merged value of counter"),
@ -311,8 +379,25 @@ check_4({BType, map}, Bucket, Client, CMod) ->
{{<<"followers">>, counter}, 10},
{{<<"friends">>, set}, [<<"Sam">>]},
{{<<"verified">>, flag}, true}],
[{pr, 3}, {notfound_ok, false}]);
check_4({BType, hll}, Bucket, Client, CMod) ->
lager:info("check_4: Checking final merged value of hll"),
check_value(Client,
CMod, {BType, Bucket},
?KEY,
riakc_hll,
4,
[{pr, 3}, {notfound_ok, false}]);
check_4({BType, gset}, Bucket, Client, CMod) ->
lager:info("check_4: Checking final merged value of sset"),
check_value(Client,
CMod, {BType, Bucket},
?KEY,
riakc_gset,
[<<"DANG">>,<<"Z">>,<<"Z^2">>,<<"Zedd's Dead">>],
[{pr, 3}, {notfound_ok, false}]).
check_value(Client, CMod, Bucket, Key, DTMod, Expected) ->
check_value(Client,CMod,Bucket,Key,DTMod,Expected,
[{r,2}, {notfound_ok, true}, {timeout, 5000}]).
@ -322,7 +407,8 @@ check_value(Client, CMod, Bucket, Key, DTMod, Expected, Options) ->
try
Result = CMod:fetch_type(Client, Bucket, Key,
Options),
lager:info("Expected ~p~n got ~p~n", [Expected, Result]),
lager:info("Expected ~p~n got ~p~n", [Expected,
Result]),
?assertMatch({ok, _}, Result),
{ok, C} = Result,
?assertEqual(true, DTMod:is_type(C)),

View File

@ -0,0 +1,186 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% @doc r_t to verify changes to serialization/metadata/changes
%% across dt upgrades
-module(verify_dt_data_upgrade).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(MAP_TYPE, <<"maps">>).
-define(SET_TYPE, <<"sets">>).
-define(SET_CAP, riak_dt_orswot).
-define(BUCKET_M, {?MAP_TYPE, <<"testbucket">>}).
-define(BUCKET_S, {?SET_TYPE, <<"testbucket">>}).
-define(KEY, <<"flipit&reverseit">>).
-define(CONFIG, [
{riak_core, [
{ring_creation_size, 8},
%% shorten cluster convergence
{vnode_parallel_start, 8},
{forced_ownership_handoff, 8},
{handoff_concurrency, 8},
%% increase AAE activity
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8}
]}
]).
confirm() ->
TestMetaData = riak_test_runner:metadata(),
OldVsn = proplists:get_value(upgrade_version, TestMetaData, lts),
NumNodes = 4,
Vsns = [{OldVsn, ?CONFIG} || _ <- lists:seq(1, NumNodes)],
Nodes = [Node1,Node2|Nodes34] = rt:build_cluster(Vsns),
rt:wait_until_transfers_complete(Nodes),
%% Create PB connection.
{P1, P2} = lists:split(1, Nodes),
rt:create_activate_and_wait_for_bucket_type(Nodes,
?SET_TYPE,
[{datatype, set}]),
rt:create_activate_and_wait_for_bucket_type(Nodes,
?MAP_TYPE,
[{datatype, map}]),
%% Partition the Cluster into 2 Sides
Partition = rt:partition(P1, P2),
Pid1 = rt:pbc(hd(P1)),
Pid2 = rt:pbc(hd(P2)),
riakc_pb_socket:set_options(Pid1, [queue_if_disconnected, auto_reconnect]),
riakc_pb_socket:set_options(Pid2, [queue_if_disconnected, auto_reconnect]),
lager:info("Writing to Partition 1 Key ~p ", [?KEY]),
make_and_check(Pid1, [a, b]),
%% Upgrade Nodes 3 - 4 only
upgrade(Nodes34, current),
lager:info("Writing to Partition 2 Key W/ Different Set ~p ", [?KEY]),
make_and_check(Pid2, [c, d, e]),
rt:heal(Partition),
rt:wait_until_transfers_complete(Nodes),
lager:info("Compare/Assert fetched values merge/remain as supposed to"
", including binary information"),
FetchSet0 = fetch(Pid1, ?BUCKET_S, ?KEY),
FetchMap0 = fetch(Pid1, ?BUCKET_M, ?KEY),
FetchSet1 = fetch(Pid2, ?BUCKET_S, ?KEY),
FetchMap1 = fetch(Pid2, ?BUCKET_M, ?KEY),
?assertEqual(FetchMap0, FetchMap1),
?assertEqual(FetchSet0, FetchSet1),
verify_dt_converge:check_value(Pid1, riakc_pb_socket,
?BUCKET_S, ?KEY, riakc_set,
[<<"a">>, <<"b">>, <<"c">>, <<"d">>,
<<"e">>]),
%% Upgrade Nodes 1-2
upgrade([Node1, Node2], current),
FetchSet2 = fetch(Pid1, ?BUCKET_S, ?KEY),
FetchMap2 = fetch(Pid1, ?BUCKET_M, ?KEY),
FetchSet3 = fetch(Pid2, ?BUCKET_S, ?KEY),
FetchMap3 = fetch(Pid2, ?BUCKET_M, ?KEY),
?assertEqual(FetchMap2, FetchMap3),
?assertEqual(FetchSet2, FetchSet3),
%% Downgrade All Nodes and Compare
downgrade(Nodes, lts),
%% Create PB connection.
Pid3 = rt:pbc(rt:select_random(Nodes)),
FetchSet4 = fetch(Pid3, ?BUCKET_S, ?KEY),
FetchMap4 = fetch(Pid3, ?BUCKET_M, ?KEY),
?assertEqual(FetchMap0, FetchMap4),
?assertEqual(FetchSet3, FetchSet4),
%% Stop PB connection.
riakc_pb_socket:stop(Pid1),
riakc_pb_socket:stop(Pid2),
riakc_pb_socket:stop(Pid3),
%% Clean cluster.
rt:clean_cluster(Nodes),
pass.
%% private funs
store_set(Client, Bucket, Key, Set) ->
riakc_pb_socket:update_type(Client, Bucket, Key, riakc_set:to_op(Set)).
store_map(Client, Bucket, Key, Map) ->
riakc_pb_socket:update_type(Client, Bucket, Key, riakc_map:to_op(Map)).
fetch(Client, Bucket, Key) ->
{ok, DT} = riakc_pb_socket:fetch_type(Client, Bucket, Key),
DT.
upgrade(Nodes, NewVsn) ->
lager:info("Upgrading to ~s version on Nodes ~p", [NewVsn, Nodes]),
[rt:upgrade(ANode, NewVsn) || ANode <- Nodes],
[rt:wait_for_service(ANode, riak_kv) || ANode <- Nodes],
[rt:wait_until_capability_contains(ANode, {riak_kv, crdt}, [?SET_CAP])
|| ANode <- Nodes].
downgrade(Nodes, OldVsn) ->
lager:info("Downgrading to ~s version on Nodes ~p", [OldVsn, Nodes]),
[rt:upgrade(ANode, OldVsn) || ANode <- Nodes],
[rt:wait_for_service(ANode, riak_kv) || ANode <- Nodes],
[rt:wait_until_capability_contains(ANode, {riak_kv, crdt}, [?SET_CAP])
|| ANode <- Nodes].
make_and_check(Pid, SetItems) ->
Set0 = verify_dt_context:make_set(SetItems),
ok = store_set(Pid, ?BUCKET_S, ?KEY, Set0),
Set1 = verify_dt_context:make_set([mtv, raps]),
Set2 = verify_dt_context:make_set([x, y, z]),
Map0 = verify_dt_context:make_map([{<<"set1">>, Set1}, {<<"set2">>, Set2}]),
ok = store_map(Pid, ?BUCKET_M, ?KEY, Map0),
CheckSet = [atom_to_binary(E, latin1) || E <- SetItems],
verify_dt_converge:check_value(Pid, riakc_pb_socket,
?BUCKET_S, ?KEY, riakc_set,
CheckSet),
verify_dt_converge:check_value(Pid, riakc_pb_socket,
?BUCKET_M, ?KEY, riakc_map,
[{{<<"set1">>, set}, [<<"mtv">>,
<<"raps">>]},
{{<<"set2">>, set}, [<<"x">>, <<"y">>,
<<"z">>]}]).

View File

@ -0,0 +1,168 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% Verify functionality of async job enable/disable flags in advanced.config.
-module(verify_job_enable_ac).
-behavior(riak_test).
-compile(export_all).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-include("job_enable_common.hrl").
%% Start with all job classes disabled - we'll slowly enable
%% and verify all the flags over the course of the test
-define(CFG, [{riak_core, [{?APP_CONFIG_KEY, []}]}]).
-define(ALL_BUCKETS, [<<"2i_test">>, <<"basic_test">>]).
-define(BASIC_TEST_KEYS, [<<"1">>, <<"2">>, <<"3">>]).
-define(JOB_CLASSES,
[Class || {Class, Enabled} <- ?JOB_CLASS_DEFAULTS, Enabled]).
confirm() ->
lager:info("Deploying 1 node"),
rt:set_backend(eleveldb),
[Node] = rt:build_cluster(1, ?CFG),
HttpClient = rt:httpc(Node),
PbClient = rt:pbc(Node),
lager:info("Writing test data via protocol buffers"),
write_test_data(PbClient),
run_tests(HttpClient, [verify_list_buckets_disabled_http,
verify_list_keys_disabled_http,
verify_secondary_index_disabled_http,
verify_mapred_disabled_http]),
run_tests(PbClient, [verify_list_buckets_disabled_pb,
verify_list_keys_disabled_pb,
verify_secondary_index_disabled_pb,
verify_mapred_disabled_pb]),
lager:info("Enabling all job classes"),
ok = rpc:call(Node, application, set_env,
[riak_core, ?APP_CONFIG_KEY, ?JOB_CLASSES]),
run_tests(HttpClient, [verify_list_buckets_enabled_http,
verify_list_keys_enabled_http,
verify_secondary_index_enabled_http,
verify_mapred_enabled_http]),
run_tests(PbClient, [verify_list_buckets_enabled_pb,
verify_list_keys_enabled_pb,
verify_secondary_index_enabled_pb,
verify_mapred_enabled_pb]),
pass.
write_test_data(Client) ->
BasicObjs = make_objs(<<"basic_test">>),
[O1, O2, O3] = make_objs(<<"2i_test">>),
MD1 = riakc_obj:get_update_metadata(O2),
MD2 = riakc_obj:set_secondary_index(MD1, [{{integer_index, "test_idx"}, [42]}]),
O2WithIdx = riakc_obj:update_metadata(O2, MD2),
SecIdxObjs = [O1, O2WithIdx, O3],
[ok = riakc_pb_socket:put(Client, O) || O <- BasicObjs ++ SecIdxObjs].
make_objs(Bucket) ->
[riakc_obj:new(Bucket,
list_to_binary([N + $1]), %% Keys = ["1", "2", "3"]
list_to_binary([N + $A])) %% Vals = ["A", "B", "C"]
|| N <- lists:seq(0, 2)].
run_tests(Client, TestList) ->
lists:foreach(fun(Test) -> run_test(Client, Test) end, TestList).
run_test(Client, Test) ->
lager:info("Running test ~p", [Test]),
?MODULE:Test(Client).
verify_list_buckets_disabled_pb(Client) ->
Expected = {error, ?ERRMSG_BIN(?TOKEN_LIST_BUCKETS_S)},
?assertEqual(Expected, riakc_pb_socket:list_buckets(Client)).
verify_list_keys_disabled_pb(Client) ->
Expected = {error, ?ERRMSG_BIN(?TOKEN_LIST_KEYS_S)},
?assertEqual(Expected, riakc_pb_socket:list_keys(Client, <<"basic_test">>)).
verify_secondary_index_disabled_pb(Client) ->
Expected = {error, ?ERRMSG_BIN(?TOKEN_SEC_INDEX)},
?assertEqual(Expected, riakc_pb_socket:get_index(Client, <<"2i_test">>,
{integer_index, "test_idx"}, 42)).
verify_mapred_disabled_pb(Client) ->
Expected = {error, ?ERRMSG_BIN(?TOKEN_MAP_REDUCE)},
?assertEqual(Expected, riakc_pb_socket:mapred(Client, <<"basic_test">>, [])).
verify_list_buckets_enabled_pb(Client) ->
{ok, Buckets} = riakc_pb_socket:list_buckets(Client),
SortedBuckets = lists:sort(Buckets),
?assertEqual(SortedBuckets, ?ALL_BUCKETS).
verify_list_keys_enabled_pb(Client) ->
{ok, Keys} = riakc_pb_socket:list_keys(Client, <<"basic_test">>),
SortedKeys = lists:sort(Keys),
?assertEqual(SortedKeys, ?BASIC_TEST_KEYS).
verify_secondary_index_enabled_pb(Client) ->
Result = riakc_pb_socket:get_index_eq(Client, <<"2i_test">>, {integer_index, "test_idx"}, 42),
?assertMatch({ok, {index_results_v1, [<<"2">>], _, _}}, Result).
verify_mapred_enabled_pb(Client) ->
{ok, [{_, Results}]} = riakc_pb_socket:mapred(Client, <<"basic_test">>, []),
SortedResults = lists:sort(Results),
Expected = [{<<"basic_test">>, integer_to_binary(K)} || K <- lists:seq(1, 3)],
?assertEqual(Expected, SortedResults).
verify_list_buckets_disabled_http(Client) ->
Result = rhc:list_buckets(Client),
?assertMatch({error, {"403", _}}, Result).
verify_list_keys_disabled_http(Client) ->
Result = rhc:list_keys(Client, <<"basic_test">>),
?assertMatch({error, {"403", _}}, Result).
verify_secondary_index_disabled_http(Client) ->
Result = rhc:get_index(Client, <<"2i_test">>, {integer_index, "test_idx"}, 42),
?assertMatch({error, {"403", _}}, Result).
verify_mapred_disabled_http(Client) ->
Result = rhc:mapred(Client, <<"basic_test">>, []),
?assertMatch({error, {"403", _}}, Result).
verify_list_buckets_enabled_http(Client) ->
{ok, Buckets} = rhc:list_buckets(Client),
SortedBuckets = lists:sort(Buckets),
?assertEqual(SortedBuckets, ?ALL_BUCKETS).
verify_list_keys_enabled_http(Client) ->
{ok, Keys} = rhc:list_keys(Client, <<"basic_test">>),
SortedKeys = lists:sort(Keys),
?assertEqual(SortedKeys, ?BASIC_TEST_KEYS).
verify_secondary_index_enabled_http(Client) ->
Result = rhc:get_index(Client, <<"2i_test">>, {integer_index, "test_idx"}, 42),
?assertMatch({ok, {index_results_v1, [<<"2">>], _, _}}, Result).
verify_mapred_enabled_http(Client) ->
{ok, [{_, Results}]} = rhc:mapred(Client, <<"basic_test">>, []),
SortedResults = lists:sort(Results),
Expected = [[<<"basic_test">>, integer_to_binary(K)] || K <- lists:seq(1, 3)],
?assertEqual(Expected, SortedResults).

View File

@ -0,0 +1,86 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% Verify functionality of async job enable/disable flags in riak.conf.
%% Toggling flags at runtime is tested in verify_job_enable_ac.
-module(verify_job_enable_rc).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-include("job_enable_common.hrl").
-define(TEST_ORDER, [true, false]).
-define(TEST_OPS, [
?TOKEN_LIST_BUCKETS,
?TOKEN_LIST_BUCKETS_S,
?TOKEN_LIST_KEYS,
?TOKEN_LIST_KEYS_S,
?TOKEN_MAP_REDUCE,
?TOKEN_MAP_REDUCE_JS,
?TOKEN_SEC_INDEX,
?TOKEN_SEC_INDEX_S,
?TOKEN_OLD_SEARCH,
?TOKEN_YZ_SEARCH
]).
%% ===================================================================
%% Test API
%% ===================================================================
confirm() ->
Configs = [
{current, {cuttlefish,
?COMMON_CONFIG ++ config(?TEST_OPS, Bool, [])}}
% make it a 4 node cluster so things get scattered around
% everything's enabled on the trailing nodes
|| Bool <- ?TEST_ORDER ++ [true, true]],
lager:info("Deploying ~b nodes ...", [erlang:length(Configs)]),
Nodes = rt:deploy_nodes(Configs),
job_enable_common:setup_cluster(Nodes),
job_enable_common:setup_yokozuna(Nodes),
[run_test(Operation, ?TEST_ORDER, Nodes) || Operation <- ?TEST_OPS],
pass.
%% ===================================================================
%% Internal
%% ===================================================================
config([{App, Op} | Operations], Enabled, Result) ->
Key = lists:flatten(?CUTTLEFISH_KEY(App, Op)),
Val = job_enable_common:enabled_string(Enabled),
config(Operations, Enabled, [{Key, Val} | Result]);
config([], _, Result) ->
Result.
run_test(Operation, [Enabled | Switches], [Node | Nodes]) ->
[job_enable_common:test_operation(Node, Operation, Enabled, ClientType)
|| ClientType <- [pbc, http] ],
run_test(Operation, Switches, Nodes);
run_test(_, [], _) ->
ok.

View File

@ -0,0 +1,55 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(verify_job_switch_defaults).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-include("job_enable_common.hrl").
%% ===================================================================
%% Test API
%% ===================================================================
confirm() ->
Configs = [{current, {cuttlefish, ?COMMON_CONFIG}}],
lager:info("Deploying ~b nodes ...", [erlang:length(Configs)]),
[Node | _] = Nodes = rt:deploy_nodes(Configs),
job_enable_common:setup_cluster(Nodes),
job_enable_common:setup_yokozuna(Nodes),
[test_job_switch(Node, Class, Enabled)
|| {Class, Enabled} <- ?JOB_CLASS_DEFAULTS],
pass.
%% ===================================================================
%% Internal
%% ===================================================================
test_job_switch(Node, Class, Enabled) ->
lager:info("verifying ~p default ~s",
[Class, job_enable_common:enabled_string(Enabled)]),
?assertEqual(Enabled, job_enable_common:get_enabled(Node, Class)),
?assertEqual(ok, job_enable_common:set_enabled(Node, Class, not Enabled)),
?assertEqual(not Enabled, job_enable_common:get_enabled(Node, Class)).

View File

@ -0,0 +1,102 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% This test was written in response to a specific Riak bug found
%% in September 2016. Doing a `get` on a datatype should always
%% cause any siblings to be merged prior to returning an object,
%% but a scenario was found that broke that invariant.
%%
%% Normally datatype values should never have dots in their metadata,
%% but the function that removes dots before writing such values
%% was only called when merging two different objects. The very first
%% write of a new datatype would thus include a dot, as there would be
%% no existing object with which to perform a merge. Additionally, the
%% merge functions in `riak_object` depend on the absence of dots to
%% distinguish datatypes from other values, so if siblings were
%% present and one of the siblings was a freshly written datatype,
%% we would not merge the two values and instead return siblings
%% on a subsequent `get`.
%%
%% NB This is not an issue when using the `fetch_type` client functions,
%% which are normally used to retrieve datatypes; those functions
%% cause a merge to be performed regardless of the presence of dots.
%% To reproduce this bug we must use a plain `get` operation.
%%
%% I arbitrarily chose HLLs for this test just because it's the newest
%% datatype and hasn't had a lot of testing yet, but this could just
%% as well be implemented with any of the CRDTs we have available to us.
-module(verify_no_datatype_siblings).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(HLL_TYPE, <<"hlls">>).
-define(BUCKET, {?HLL_TYPE, <<"testbucket">>}).
-define(KEY, <<"xyzzy">>).
confirm() ->
{N1, N2} = setup(),
PBC1 = rt:pbc(N1),
PBC2 = rt:pbc(N2),
lager:info("Partition cluster in two so we can do conflicting writes"),
PartInfo = rt:partition([N1], [N2]),
write_siblings(PBC1, PBC2),
lager:info("Heal partition"),
?assertEqual(ok, rt:heal(PartInfo)),
verify_no_siblings(PBC1),
pass.
setup() ->
Nodes = [N1, N2] = rt:build_cluster(2),
ok = rt:create_activate_and_wait_for_bucket_type(Nodes, ?HLL_TYPE,
[{datatype, hll}, {hll_precision, 16}]),
{N1, N2}.
write_siblings(PBC1, PBC2) ->
lager:info("Write to one side of the partition"),
?assertEqual(ok, do_write(PBC1, <<"plugh">>)),
lager:info("Write to other side of the partition"),
?assertEqual(ok, do_write(PBC2, <<"y2">>)).
do_write(PBC, Value) ->
NewHLL = riakc_hll:new(),
riakc_pb_socket:update_type(PBC, ?BUCKET, ?KEY,
riakc_hll:to_op(riakc_hll:add_element(Value, NewHLL))).
verify_no_siblings(PBC) ->
{ok, Obj} = do_read(PBC),
lager:info("Got object ~p", [Obj]),
NumSiblings = length(riakc_obj:get_values(Obj)),
?assertEqual(1, NumSiblings).
do_read(PBC) ->
riakc_pb_socket:get(PBC, ?BUCKET, ?KEY).

View File

@ -0,0 +1,49 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(verify_removed_capability).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
%% Start 3 nodes, create a capability and join them into a cluster
%%
%% Stop one node then restart it again, it joins the cluster without the
%% capability defined.
%%
%% The capability on the two nodes which have not been shut down should be
%% renegotiated to be the default value.
confirm() ->
[Node_A, Node_B, Node_C] = rt:deploy_nodes(3),
Cap_name = {rt, cap_1},
V1 = 1,
V2 = 2,
ok = rpc:call(Node_A, riak_core_capability, register, [Cap_name, [V2,V1], V1, V1]),
ok = rpc:call(Node_B, riak_core_capability, register, [Cap_name, [V2,V1], V1, V1]),
ok = rpc:call(Node_C, riak_core_capability, register, [Cap_name, [V2,V1], V1, V1]),
ok = rt:join_cluster([Node_A,Node_B,Node_C]),
ok = rt:wait_until_ring_converged([Node_A,Node_B,Node_C]),
ok = rt:wait_until_capability(Node_A, Cap_name, V2),
ok = rt:wait_until_capability(Node_B, Cap_name, V2),
ok = rt:wait_until_capability(Node_C, Cap_name, V2),
rt:stop(Node_B),
rt:start(Node_B),
ok = rt:wait_until_capability(Node_A, Cap_name, V1),
ok = rt:wait_until_capability(Node_C, Cap_name, V1),
pass.

View File

@ -34,6 +34,11 @@ confirm() ->
rt:update_app_config(all, [{riak_kv, [{object_format, v1}]}]),
TestMetaData = riak_test_runner:metadata(),
DowngradeVsn = proplists:get_value(upgrade_version, TestMetaData, previous),
%% Use previous version's riak.conf so that when we
%% downgrade we don't crash on unknown config keys:
rt:copy_conf(?N, previous, current),
Nodes = [Node1|_] = rt:build_cluster(?N),
[rt:wait_until_capability(N, {riak_kv, object_format}, v1, v0) || N <- Nodes],

View File

@ -460,6 +460,13 @@ common_stats() ->
<<"goldrush_version">>,
<<"gossip_received">>,
<<"handoff_timeouts">>,
<<"hll_bytes">>,
<<"hll_bytes_mean">>,
<<"hll_bytes_100">>,
<<"hll_bytes_95">>,
<<"hll_bytes_99">>,
<<"hll_bytes_median">>,
<<"hll_bytes_total">>,
<<"ibrowse_version">>,
<<"ignored_gossip_total">>,
<<"index_fsm_active">>,
@ -512,6 +519,21 @@ common_stats() ->
<<"node_get_fsm_counter_time_median">>,
<<"node_get_fsm_errors">>,
<<"node_get_fsm_errors_total">>,
<<"node_get_fsm_hll_objsize_100">>,
<<"node_get_fsm_hll_objsize_95">>,
<<"node_get_fsm_hll_objsize_99">>,
<<"node_get_fsm_hll_objsize_mean">>,
<<"node_get_fsm_hll_objsize_median">>,
<<"node_get_fsm_hll_siblings_100">>,
<<"node_get_fsm_hll_siblings_95">>,
<<"node_get_fsm_hll_siblings_99">>,
<<"node_get_fsm_hll_siblings_mean">>,
<<"node_get_fsm_hll_siblings_median">>,
<<"node_get_fsm_hll_time_100">>,
<<"node_get_fsm_hll_time_95">>,
<<"node_get_fsm_hll_time_99">>,
<<"node_get_fsm_hll_time_mean">>,
<<"node_get_fsm_hll_time_median">>,
<<"node_get_fsm_in_rate">>,
<<"node_get_fsm_map_objsize_100">>,
<<"node_get_fsm_map_objsize_95">>,
@ -565,6 +587,8 @@ common_stats() ->
<<"node_gets">>,
<<"node_gets_counter">>,
<<"node_gets_counter_total">>,
<<"node_gets_hll">>,
<<"node_gets_hll_total">>,
<<"node_gets_map">>,
<<"node_gets_map_total">>,
<<"node_gets_set">>,
@ -577,6 +601,11 @@ common_stats() ->
<<"node_put_fsm_counter_time_99">>,
<<"node_put_fsm_counter_time_mean">>,
<<"node_put_fsm_counter_time_median">>,
<<"node_put_fsm_hll_time_100">>,
<<"node_put_fsm_hll_time_95">>,
<<"node_put_fsm_hll_time_99">>,
<<"node_put_fsm_hll_time_mean">>,
<<"node_put_fsm_hll_time_median">>,
<<"node_put_fsm_in_rate">>,
<<"node_put_fsm_map_time_100">>,
<<"node_put_fsm_map_time_95">>,
@ -600,6 +629,8 @@ common_stats() ->
<<"node_puts">>,
<<"node_puts_counter">>,
<<"node_puts_counter_total">>,
<<"node_puts_hll">>,
<<"node_puts_hll_total">>,
<<"node_puts_map">>,
<<"node_puts_map_total">>,
<<"node_puts_set">>,
@ -613,6 +644,13 @@ common_stats() ->
<<"object_counter_merge_time_mean">>,
<<"object_counter_merge_time_median">>,
<<"object_counter_merge_total">>,
<<"object_hll_merge">>,
<<"object_hll_merge_time_100">>,
<<"object_hll_merge_time_95">>,
<<"object_hll_merge_time_99">>,
<<"object_hll_merge_time_mean">>,
<<"object_hll_merge_time_median">>,
<<"object_hll_merge_total">>,
<<"object_map_merge">>,
<<"object_map_merge_time_100">>,
<<"object_map_merge_time_95">>,
@ -647,7 +685,6 @@ common_stats() ->
<<"poolboy_version">>,
<<"postcommit_fail">>,
<<"precommit_fail">>,
<<"protobuffs_version">>,
<<"public_key_version">>,
<<"read_repairs">>,
<<"read_repairs_counter">>,
@ -656,6 +693,8 @@ common_stats() ->
<<"read_repairs_fallback_notfound_one">>,
<<"read_repairs_fallback_outofdate_count">>,
<<"read_repairs_fallback_outofdate_one">>,
<<"read_repairs_hll">>,
<<"read_repairs_hll_total">>,
<<"read_repairs_map">>,
<<"read_repairs_map_total">>,
<<"read_repairs_primary_notfound_count">>,
@ -703,6 +742,8 @@ common_stats() ->
<<"search_blockedvnode_count">>,
<<"search_blockedvnode_one">>,
<<"search_detected_repairs_count">>,
<<"search_index_bad_entry_count">>,
<<"search_index_bad_entry_one">>,
<<"search_index_error_threshold_blown_count">>,
<<"search_index_error_threshold_blown_one">>,
<<"search_index_error_threshold_failure_count">>,
@ -711,6 +752,8 @@ common_stats() ->
<<"search_index_error_threshold_ok_one">>,
<<"search_index_error_threshold_recovered_count">>,
<<"search_index_error_threshold_recovered_one">>,
<<"search_index_extract_fail_count">>,
<<"search_index_extract_fail_one">>,
<<"search_index_fail_count">>,
<<"search_index_fail_one">>,
<<"search_index_latency_95">>,
@ -804,6 +847,13 @@ common_stats() ->
<<"vnode_get_fsm_time_median">>,
<<"vnode_gets">>,
<<"vnode_gets_total">>,
<<"vnode_hll_update">>,
<<"vnode_hll_update_time_100">>,
<<"vnode_hll_update_time_95">>,
<<"vnode_hll_update_time_99">>,
<<"vnode_hll_update_time_mean">>,
<<"vnode_hll_update_time_median">>,
<<"vnode_hll_update_total">>,
<<"vnode_index_deletes">>,
<<"vnode_index_deletes_postings">>,
<<"vnode_index_deletes_postings_total">>,

View File

@ -22,6 +22,7 @@
-module(verify_search).
-include_lib("eunit/include/eunit.hrl").
-include("tests/job_enable_common.hrl").
-export([confirm/0]).
%% To run in the possibly remote node
@ -34,6 +35,8 @@ confirm() ->
[Node0 | _RestNodes] = Nodes = rt:build_cluster(3, Config),
rt:wait_until_ring_converged(Nodes),
job_enable_common:set_enabled(Nodes, ?TOKEN_OLD_SEARCH, true),
Path = rt_config:get(rt_scratch_dir),
lager:info("Creating scratch dir if necessary at ~s", [Path]),
?assertMatch({0, _}, rt:cmd("mkdir -p " ++ Path)),

View File

@ -1,21 +1,28 @@
-module(yz_crdt).
-export([confirm/0]).
-compile(export_all).
-compile({parse_transform, rt_intercept_pt}).
-include_lib("eunit/include/eunit.hrl").
-define(HARNESS, (rt_config:get(rt_harness))).
-define(INDEX, <<"maps">>).
-define(TYPE, <<"maps">>).
-define(KEY, "Chris Meiklejohn").
-define(KEY, <<"Chris Meiklejohn">>).
-define(BUCKET, {?TYPE, <<"testbucket">>}).
-define(GET(K,L), proplists:get_value(K, L)).
-define(N, 3).
-define(CONF,
[
{riak_core,
[{ring_creation_size, 8}]
},
{riak_kv,
[{delete_mode, keep},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8},
{anti_entropy_tick, 1000}]},
{yokozuna,
[{enabled, true}]
}]).
@ -39,80 +46,519 @@ confirm() ->
rt:create_and_activate_bucket_type(Node,
?TYPE,
[{datatype, map},
{n_val, ?N},
{search_index, ?INDEX}]),
%% Write some sample data.
lager:info("Write some sample data"),
test_sample_data(Pid, Nodes, ?BUCKET, ?KEY, ?INDEX),
lager:info("Search and Validate our CRDT writes/updates."),
ok = rt:wait_until(fun() -> validate_sample_data(Pid, ?KEY, ?INDEX)
end),
lager:info("Test setting the register of a map twice to different values."
" (The # of results should still be 1)"),
test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
ok = rt:wait_until(fun() -> validate_test_repeat_set(Pid, ?INDEX)
end),
lager:info("FYI: delete_mode is on keep here to make sure YZ handles"
" deletes correctly throughout."),
lager:info("Test varying deletes operations"),
test_and_validate_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
lager:info("Test to make sure yz AAE handles deletes/removes correctly"),
test_and_validate_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX),
lager:info("Test to make sure siblings don't exist after partition"),
test_siblings(Nodes, ?BUCKET, ?INDEX),
lager:info("Verify counts and operations after heal + transfers + commits"),
ok = rt:wait_until(fun() -> validate_test_siblings(Pid, ?BUCKET, ?INDEX)
end),
%% Stop PB connection
riakc_pb_socket:stop(Pid),
pass.
test_sample_data(Pid, Cluster, Bucket, Key, Index) ->
Map1 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
riakc_register:set(list_to_binary(?KEY), R)
riakc_register:set(Key, R)
end, riakc_map:new()),
Map2 = riakc_map:update(
{<<"interests">>, set},
fun(S) ->
riakc_set:add_element(<<"thing">>, S) end,
Map1),
ok = riakc_pb_socket:update_type(
Pid,
?BUCKET,
?KEY,
Bucket,
Key,
riakc_map:to_op(Map2)),
yokozuna_rt:commit(Nodes, ?INDEX),
drain_and_commit(Cluster, Index).
%% Perform simple queries, check for register, set fields.
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX, <<"name_register:Chris*">>),
lager:info("Search name_register:Chris*: ~p~n", [Results1a]),
?assertEqual(length(Results1a), 1),
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)),
list_to_binary(?KEY)),
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)),
<<"thing">>),
%% @doc Test setting the register of a map twice to different values.
%% The # of results should still be 1.
test_repeat_sets(Pid, Cluster, Bucket, Index, Key) ->
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
M2 = riakc_map:update(
{<<"update">>, register},
fun(R) ->
riakc_register:set(<<"foo">>, R)
end, M1),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M2)),
M3 = riakc_map:update(
{<<"update">>, register},
fun(R) ->
riakc_register:set(<<"bar">>, R)
end, M1),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M3)),
{ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX, <<"interests_set:thing*">>),
lager:info("Search interests_set:thing*: ~p~n", [Results2a]),
?assertEqual(length(Results2a), 1),
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)),
list_to_binary(?KEY)),
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)),
<<"thing">>),
drain_and_commit(Cluster, Index).
{ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
lager:info("Search testbucket: ~p~n", [Results3a]),
?assertEqual(length(Results3a), 1),
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)),
list_to_binary(?KEY)),
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)),
<<"thing">>),
%% @doc Tests varying deletes of within a CRDT map and checks for correct counts
%% - Remove registers, remove and add elements within a set
%% - Delete the map (associated w/ a key)
%% - Recreate objects in the map and delete the map again
test_and_validate_delete(Pid, Cluster, Bucket, Index, Key) ->
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
%% Redo queries and check if results are equal
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX, <<"name_register:Chris*">>),
?assertEqual(number_of_fields(Results1a),
number_of_fields(Results1b)),
lager:info("Remove register from map"),
M2 = riakc_map:erase({<<"name">>, register}, M1),
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX, <<"interests_set:thing*">>),
?assertEqual(number_of_fields(Results2a),
number_of_fields(Results2b)),
lager:info("Delete element from set (in map) & Add element to set"),
M3 = riakc_map:update(
{<<"interests">>, set},
fun(S) ->
riakc_set:del_element(<<"thing">>,
riakc_set:add_element(<<"roses">>, S))
end, M2),
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
?assertEqual(number_of_fields(Results3a),
number_of_fields(Results3b)),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M3)),
%% Stop PB connection.
riakc_pb_socket:stop(Pid),
drain_and_commit(Cluster, Index),
%% Clean cluster.
rt:clean_cluster(Nodes),
lager:info("Search deleted/erased name_register:*"),
search_and_validate_found(Pid, Index, <<"name_register:*">>, 0),
pass.
lager:info("Add another element to set (in map)"),
M4 = riakc_map:update(
{<<"interests">>, set},
fun(S) ->
riakc_set:add_element(<<"pans">>, S)
end, M3),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M4)),
drain_and_commit(Cluster, Index),
lager:info("Search deleted interests_set:thing*"),
search_and_validate_found(Pid, Index, <<"interests_set:thing*">>, 0),
lager:info("Delete key for map"),
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
drain_and_commit(Cluster, Index),
?assertEqual({error, {notfound, map}},
riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
lager:info("Search deleted map: *:*"),
search_and_validate_found(Pid, Index, <<"*:*">>, 0),
lager:info("Recreate object and check counts..."),
lager:info("Set a new register for map"),
M5 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
riakc_register:set(<<"hello">>, R)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key,
riakc_map:to_op(M5)),
drain_and_commit(Cluster, Index),
{ok, M6} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
Keys = riakc_map:fetch_keys(M6),
?assertEqual(1, length(Keys)),
?assert(riakc_map:is_key({<<"name">>, register}, M6)),
lager:info("Search recreated map: *:*"),
search_and_validate_found(Pid, Index, <<"*:*">>, 1),
lager:info("Delete key for map again"),
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
drain_and_commit(Cluster, Index),
?assertEqual({error, {notfound, map}},
riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
lager:info("Search ~p deleted map: *:*", [Key]),
search_and_validate_found(Pid, Index, <<"*:*">>, 0).
%% @doc Tests key/map delete and AAE
%% - Use intercept to trap yz_kv:delete_operation to skip over
%% - Makes sure that yz AAE handles tombstone on expire/exchange
%% - Recreate objects and check
test_and_validate_delete_aae(Pid, Cluster, Bucket, Index) ->
Key1 = <<"ohyokozuna">>,
M1 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
riakc_register:set(<<"jokes are">>, R)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key1,
riakc_map:to_op(M1)),
Key2 = <<"ohriaksearch">>,
M2 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
riakc_register:set(<<"better explained">>, R)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key2,
riakc_map:to_op(M2)),
drain_and_commit(Cluster, Index),
lager:info("Add and load handle_delete_operation intercept"),
[make_intercepts_tab(ANode) || ANode <- Cluster],
[rt_intercept:add(ANode, {yz_solrq_helper, [{{get_ops_for_no_sibling_deletes, 3},
handle_get_ops_for_no_sibling_deletes}]})
|| ANode <- Cluster],
[true = rpc:call(ANode, ets, insert, [intercepts_tab, {del_put, 0}]) ||
ANode <- Cluster],
[rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster],
lager:info("Delete key ~p for map", [Key2]),
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key2)),
?assertEqual({error, {notfound, map}},
riakc_pb_socket:fetch_type(Pid, Bucket, Key2)),
drain_and_commit(Cluster, Index),
lager:info("Search all results, expect extra b/c tombstone"
" and we've modified the delete op : *:*"),
search_and_validate_found(Pid, Index, <<"*:*">>, 2),
lager:info("Expire and re-check"),
yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
drain_and_commit(Cluster, Index),
lager:info("Search all results, expect removed tombstone b/c AAE"
" should clean it up: *:*"),
search_and_validate_found(Pid, Index, <<"*:*">>, 1),
lager:info("Recreate object and check counts"),
M3 = riakc_map:update(
{<<"name">>, register},
fun(R) ->
riakc_register:set(<<"hello again, is it me you're"
"looking for">>, R)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid,
Bucket,
Key2,
riakc_map:to_op(M3)),
drain_and_commit(Cluster, Index),
{ok, M4} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
Keys = riakc_map:fetch_keys(M4),
?assertEqual(1, length(Keys)),
?assert(riakc_map:is_key({<<"name">>, register}, M4)),
lager:info("Search recreated map: *:*"),
search_and_validate_found(Pid, Index, <<"*:*">>, 2).
%% @doc Tests sibling handling/merge when there's a partition
%% - Write/remove from separate partitions
%% - Verify counts and that CRDTs have no siblings, vtags,
%% after healing partitions. The CRDT map merges so the search
%% results be consistent.
test_siblings(Cluster, Bucket, Index) ->
Key1 = <<"Movies">>,
Key2 = <<"Games">>,
Set1 = <<"directors">>,
Set2 = <<"characters">>,
%% make siblings
{P1, P2} = lists:split(1, Cluster),
%% Create an object in Partition 1 and siblings in Partition 2
lager:info("Create partition: ~p | ~p", [P1, P2]),
Partition = rt:partition(P1, P2),
%% PB connections for accessing each side
Pid1 = rt:pbc(hd(P1)),
Pid2 = rt:pbc(hd(P2)),
riakc_pb_socket:set_options(Pid1, [queue_if_disconnected, auto_reconnect]),
riakc_pb_socket:set_options(Pid2, [queue_if_disconnected, auto_reconnect]),
%% P1 writes
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
[Key1, <<"Kubrick">>]),
M1 = riakc_map:update(
{Set1, set},
fun(S) ->
riakc_set:add_element(<<"Kubrick">>, S)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid1,
Bucket,
Key1,
riakc_map:to_op(M1)),
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
[Key1, <<"Demme">>]),
M2 = riakc_map:update(
{Set1, set},
fun(S) ->
riakc_set:add_element(<<"Demme">>, S)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid1,
Bucket,
Key1,
riakc_map:to_op(M2)),
%% P2 Siblings
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
[Key2, <<"Sonic">>]),
M3 = riakc_map:update(
{Set2, set},
fun(S) ->
riakc_set:add_element(<<"Sonic">>, S)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid2,
Bucket,
Key2,
riakc_map:to_op(M3)),
lager:info("Delete key from Partition 2: Key ~p", [Key2]),
ok = riakc_pb_socket:delete(Pid2, Bucket, Key2),
lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char"
" ~p", [Key2, <<"Crash">>]),
M4 = riakc_map:update(
{Set2, set},
fun(S) ->
riakc_set:add_element(<<"Crash">>, S)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid2,
Bucket,
Key2,
riakc_map:to_op(M4)),
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
[Key2, <<"Mario">>]),
M5 = riakc_map:update(
{Set2, set},
fun(S) ->
riakc_set:add_element(<<"Mario">>, S)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid2,
Bucket,
Key2,
riakc_map:to_op(M5)),
lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p",
[Key1, <<"Klimov">>]),
M6 = riakc_map:update(
{Set1, set},
fun(S) ->
riakc_set:add_element(<<"Klimov">>, S)
end, riakc_map:new()),
ok = riakc_pb_socket:update_type(
Pid2,
Bucket,
Key1,
riakc_map:to_op(M6)),
rt:heal(Partition),
rt:wait_until_transfers_complete(Cluster),
drain_and_commit(Cluster, Index).
validate_sample_data(Pid, Key, Index) ->
try
Thing = <<"thing">>,
{ok, {search_results, Results1a, _, Found1}} = riakc_pb_socket:search(
Pid, Index, <<"name_register:Chris*">>),
?assertEqual(1, Found1),
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results1a)),
Key),
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results1a)),
Thing),
{ok, {search_results, Results2a, _, Found2}} = riakc_pb_socket:search(
Pid, Index, <<"interests_set:thing*">>),
?assertEqual(1, Found2),
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results2a)),
Key),
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results2a)),
Thing),
{ok, {search_results, Results3a, _, Found3}} = riakc_pb_socket:search(
Pid, Index, <<"_yz_rb:testbucket">>),
?assertEqual(1, Found3),
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results3a)),
Key),
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results3a)),
Thing),
%% Redo queries and check if results are equal
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
Pid, Index, <<"name_register:Chris*">>),
?assertEqual(number_of_fields(Results1a, Index),
number_of_fields(Results1b, Index)),
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
Pid, Index, <<"interests_set:thing*">>),
?assertEqual(number_of_fields(Results2a, Index),
number_of_fields(Results2b, Index)),
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
Pid, Index, <<"_yz_rb:testbucket">>),
?assertEqual(number_of_fields(Results3a, Index),
number_of_fields(Results3b, Index)),
true
catch Err:Reason ->
lager:info("Waiting for CRDT search results to converge. Error"
" was ~p.", [{Err, Reason}]),
false
end.
validate_test_repeat_set(Pid, Index) ->
try
{ok, {search_results, _R, _, Found}} = riakc_pb_socket:search(
Pid, Index,
<<"update_register:*">>),
?assertEqual(1, Found),
true
catch Err:Reason ->
lager:info("Waiting for CRDT search results to converge. Error"
" was ~p.", [{Err, Reason}]),
false
end.
validate_test_siblings(Pid, Bucket, Index) ->
try
Key1 = <<"Movies">>,
Key2 = <<"Games">>,
{ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1),
Keys = riakc_map:fetch_keys(MF1),
?assertEqual(1, length(Keys)),
?assert(riakc_map:is_key({<<"directors">>, set}, MF1)),
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
Pid, Index,
<<"directors_set:*">>),
lager:info("Search movies map directors_set:*: ~p~n", [Results1]),
?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>,
?GET(?INDEX, Results1)))),
{ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
Keys2 = riakc_map:fetch_keys(MF2),
?assertEqual(1, length(Keys2)),
?assert(riakc_map:is_key({<<"characters">>, set}, MF2)),
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
Pid, Index,
<<"characters_set:*">>),
lager:info("Search games map characters_set:*: ~p~n", [Results2]),
?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>,
?GET(?INDEX, Results2)))),
{ok, {search_results, Results3, _, Found}} = riakc_pb_socket:search(
Pid, Index,
<<"_yz_vtag:*">>),
lager:info("Search vtags in search *:*: ~p~n", [Results3]),
?assertEqual(0, Found),
true
catch Err:Reason ->
lager:info("Waiting for CRDT search results to converge. Error"
" was ~p.", [{Err, Reason}]),
false
end.
%% @private
number_of_fields(Resp) ->
length(?GET(?INDEX, Resp)).
drain_and_commit(Cluster, Index) ->
yokozuna_rt:drain_solrqs(Cluster),
yokozuna_rt:commit(Cluster, Index).
%% @private
number_of_fields(Resp, Index) ->
length(?GET(Index, Resp)).
%% @private
make_intercepts_tab(Node) ->
SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]),
intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table,
public, set, {heir, SupPid, {}}]]).
%% @private
search_and_validate_found(Pid, Index, Search, ExpectedCount) ->
ok = rt:wait_until(
fun() ->
try
{ok, {search_results, Results2, _, F}} =
riakc_pb_socket:search(Pid, Index, Search),
?assertEqual(ExpectedCount, F),
true
catch Err:Reason ->
lager:info(
"Waiting for CRDT search results to converge."
" Index: ~p"
" Search: ~p"
" Error: ~p",
[Index, Search, {Err, Reason}]
),
false
end
end).

View File

@ -361,6 +361,9 @@ test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) ->
riakc_pb_socket:stop(APid).
test_bad_extraction(Cluster) ->
%% Previous test enabled AAE, which makes the number of repairs here not consistent
%% Turn off AAE again just to make the test deterministic.
rpc:multicall(Cluster, riak_kv_entropy_manager, disable, []),
%%
%% register the no-op extractor on all the nodes with a content type
%%
@ -401,7 +404,7 @@ test_bad_extraction(Cluster) ->
%% Verify the stats. There should be one more index failure,
%% but there should be more more "melts" (error threshold failures)
%%
yz_rt:wait_until(
yokozuna_rt:wait_until(
Cluster,
fun(_Node) ->
check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount)
@ -417,16 +420,14 @@ check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount) ->
[PreviousFailCount, FailCount,
PreviousErrorThresholdCount, ErrorThresholdCount]
),
%% TODO Cf. TODO in yz_solrq_helper, where we are double-counting
%% bad requests in the index fail stats
PreviousFailCount + 2 * 1 * ?NVAL == FailCount
PreviousFailCount + ?NVAL == FailCount
andalso PreviousErrorThresholdCount == ErrorThresholdCount.
get_error_stats(Cluster) ->
AllStats = [rpc:call(Node, yz_stat, get_stats, []) || Node <- Cluster],
{
lists:sum([get_count([index, fail], count, Stats) || Stats <- AllStats]),
lists:sum([get_count([index, bad_entry], count, Stats) || Stats <- AllStats]),
lists:sum([get_count([search_index_error_threshold_failure_count], value, Stats) || Stats <- AllStats])
}.

View File

@ -122,7 +122,7 @@ dialyzer-run:
| grep -F -f dialyzer.ignore-warnings.tmp -v \
| sed -E 's/^[[:space:]]*[0-9]+[[:space:]]*//' \
| sed -E 's/([]\^:+?|()*.$${}\[])/\\\1/g' \
| sed -E 's/(\\\.erl\\\:)/\1\\d+:/g' \
| sed -E 's/(\\\.erl\\\:)/\1[[:digit:]]+:/g' \
| sed -E 's/^(.*)$$/^[[:space:]]*\1$$/g' \
> dialyzer_unhandled_warnings ; \
rm dialyzer.ignore-warnings.tmp; \