mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-06 00:25:22 +00:00
Merge remote-tracking branch 'origin/develop-2.2' into bch-merge-with-2.2
This commit is contained in:
commit
97196318cc
@ -1,44 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2015 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%-------------------------------------------------------------------
|
||||
|
||||
%% Example from:
|
||||
%% http://docs.basho.com/riak/latest/dev/search/custom-extractors/#An-Example-Custom-Extractor
|
||||
|
||||
-module(yz_noop_extractor_intercepts).
|
||||
-compile(export_all).
|
||||
-include("intercept.hrl").
|
||||
|
||||
extract_httpheader(Value) ->
|
||||
extract_httpheader(Value, []).
|
||||
|
||||
extract_httpheader(Value, _Opts) ->
|
||||
{ok,
|
||||
{http_request,
|
||||
Method,
|
||||
{absoluteURI, http, Host, undefined, Uri},
|
||||
_Version},
|
||||
_Rest} = erlang:decode_packet(http, Value, []),
|
||||
[{method, Method}, {host, list_to_binary(Host)}, {uri, list_to_binary(Uri)}].
|
||||
|
||||
extract_non_unicode_data(Value) ->
|
||||
extract_non_unicode_data(Value, []).
|
||||
|
||||
extract_non_unicode_data(_Value, _Opts) ->
|
||||
[{blob, <<9147374713>>}].
|
@ -1,48 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2015 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%-------------------------------------------------------------------
|
||||
-module(yz_solrq_helper_intercepts).
|
||||
-compile(export_all).
|
||||
|
||||
-include("intercept.hrl").
|
||||
|
||||
-define(M, yz_solrq_helper_orig).
|
||||
|
||||
handle_get_ops_for_no_sibling_deletes(LI, P, Obj) ->
|
||||
Lookup = ets:lookup(intercepts_tab, del_put),
|
||||
case Lookup of
|
||||
[] -> original_get_ops_for_no_sibling_deletes(LI, P, Obj);
|
||||
_ ->
|
||||
case proplists:get_value(del_put, Lookup) of
|
||||
0 ->
|
||||
error_logger:info_msg(
|
||||
"Delete operation intercepted for BKey ~p",
|
||||
[{riak_object:bucket(Obj), riak_object:key(Obj)}]),
|
||||
ets:update_counter(intercepts_tab, del_put, 1),
|
||||
[];
|
||||
_ ->
|
||||
original_get_ops_for_no_sibling_deletes(LI, P, Obj)
|
||||
end
|
||||
end.
|
||||
|
||||
original_get_ops_for_no_sibling_deletes(LI, P, Obj) ->
|
||||
error_logger:info_msg(
|
||||
"Delete operation original for BKey ~p",
|
||||
[{riak_object:bucket(Obj), riak_object:key(Obj)}]),
|
||||
?M:get_ops_for_no_sibling_deletes_orig(LI, P, Obj).
|
@ -1,563 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2015 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%-------------------------------------------------------------------
|
||||
-module(yokozuna_rt).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include("yokozuna_rt.hrl").
|
||||
|
||||
-export([brutal_kill_remove_index_dirs/3,
|
||||
check_exists/2,
|
||||
clear_trees/1,
|
||||
commit/2,
|
||||
drain_solrqs/1,
|
||||
expire_trees/1,
|
||||
gen_keys/1,
|
||||
host_entries/1,
|
||||
create_indexed_bucket_type/3,
|
||||
create_indexed_bucket_type/4,
|
||||
override_schema/5,
|
||||
remove_index_dirs/3,
|
||||
rolling_upgrade/2,
|
||||
search/4,
|
||||
search/5,
|
||||
search_expect/5,
|
||||
search_expect/6,
|
||||
search_expect/7,
|
||||
assert_search/6,
|
||||
verify_num_found_query/3,
|
||||
wait_for_aae/1,
|
||||
wait_for_full_exchange_round/2,
|
||||
wait_for_index/2,
|
||||
wait_for_schema/2,
|
||||
wait_for_schema/3,
|
||||
wait_until/2,
|
||||
write_data/5,
|
||||
write_data/6,
|
||||
http/4,
|
||||
http/5,
|
||||
http/6]).
|
||||
|
||||
-type host() :: string().
|
||||
-type portnum() :: integer().
|
||||
-type count() :: non_neg_integer().
|
||||
-type json_string() :: atom | string() | binary().
|
||||
-type search_type() :: solr | yokozuna.
|
||||
-type method() :: get | post | head | options | put | delete | trace |
|
||||
mkcol | propfind | proppatch | lock | unlock | move | copy.
|
||||
-type response() :: {ok, string(), [{string(), string()}], string()|binary()} |
|
||||
{error, term()}.
|
||||
-type cluster() :: [node()].
|
||||
|
||||
|
||||
-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
|
||||
-define(SOFTCOMMIT, 1000).
|
||||
|
||||
-spec host_entries(rt:conn_info()) -> [{host(), portnum()}].
|
||||
host_entries(ClusterConnInfo) ->
|
||||
[riak_http(I) || {_,I} <- ClusterConnInfo].
|
||||
|
||||
%% @doc Generate `SeqMax' keys. Yokozuna supports only UTF-8 compatible keys.
|
||||
-spec gen_keys(pos_integer()) -> list().
|
||||
gen_keys(SeqMax) ->
|
||||
[<<N:64/integer>> || N <- lists:seq(1, SeqMax),
|
||||
not lists:any(
|
||||
fun(E) -> E > 127 end,
|
||||
binary_to_list(<<N:64/integer>>))].
|
||||
|
||||
%% @doc Write `Keys' via the PB inteface to a `Bucket' and have them
|
||||
%% searchable in an `Index'.
|
||||
-spec write_data([node()], pid(), index_name(), bucket(), [binary()]) -> ok.
|
||||
write_data(Cluster, Pid, Index, Bucket, Keys) ->
|
||||
riakc_pb_socket:set_options(Pid, [queue_if_disconnected]),
|
||||
|
||||
create_and_set_index(Cluster, Pid, Bucket, Index),
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
%% Write keys
|
||||
lager:info("Writing ~p keys", [length(Keys)]),
|
||||
[ok = rt:pbc_write(Pid, Bucket, Key, Key, "text/plain") || Key <- Keys],
|
||||
ok.
|
||||
|
||||
-spec write_data([node()], pid(), index_name(), {schema_name(), raw_schema()},
|
||||
bucket(), [binary()]) -> ok.
|
||||
write_data(Cluster, Pid, Index, {SchemaName, SchemaData},
|
||||
Bucket, Keys) ->
|
||||
riakc_pb_socket:set_options(Pid, [queue_if_disconnected]),
|
||||
|
||||
riakc_pb_socket:create_search_schema(Pid, SchemaName, SchemaData),
|
||||
|
||||
create_and_set_index(Cluster, Pid, Bucket, Index, SchemaName),
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
%% Write keys
|
||||
lager:info("Writing ~p keys", [length(Keys)]),
|
||||
[ok = rt:pbc_write(Pid, Bucket, Key, Key, "text/plain") || Key <- Keys],
|
||||
ok.
|
||||
|
||||
%% @doc Peform a rolling upgrade of the `Cluster' to a different `Version' based
|
||||
%% on current | previous | legacy.
|
||||
-spec rolling_upgrade([node()], current | previous | legacy) -> ok.
|
||||
rolling_upgrade(Cluster, Version) ->
|
||||
rolling_upgrade(Cluster, Version, same, [riak_kv, yokozuna]).
|
||||
-spec rolling_upgrade([node()], current | previous | legacy, [term()] | same, [atom()]) -> ok.
|
||||
rolling_upgrade(Cluster, Version, UpgradeConfig, WaitForServices) when is_list(Cluster) ->
|
||||
lager:info("Perform rolling upgrade on cluster ~p", [Cluster]),
|
||||
[rolling_upgrade(Node, Version, UpgradeConfig, WaitForServices) || Node <- Cluster],
|
||||
ok;
|
||||
rolling_upgrade(Node, Version, UpgradeConfig, WaitForServices) ->
|
||||
rt:upgrade(Node, Version, UpgradeConfig),
|
||||
[rt:wait_for_service(Node, Service) || Service <- WaitForServices],
|
||||
ok.
|
||||
|
||||
%% @doc Use AAE status to verify that exchange has occurred for all
|
||||
%% partitions since the time this function was invoked.
|
||||
-spec wait_for_aae([node()]) -> ok.
|
||||
wait_for_aae(Cluster) ->
|
||||
lager:info("Wait for AAE to migrate/repair indexes"),
|
||||
wait_for_all_trees(Cluster),
|
||||
wait_for_full_exchange_round(Cluster, erlang:now()),
|
||||
ok.
|
||||
|
||||
%% @doc Wait for all AAE trees to be built.
|
||||
-spec wait_for_all_trees([node()]) -> ok.
|
||||
wait_for_all_trees(Cluster) ->
|
||||
F = fun(Node) ->
|
||||
lager:info("Check if all trees built for node ~p", [Node]),
|
||||
Info = rpc:call(Node, yz_kv, compute_tree_info, []),
|
||||
NotBuilt = [X || {_,undefined}=X <- Info],
|
||||
NotBuilt == []
|
||||
end,
|
||||
wait_until(Cluster, F),
|
||||
ok.
|
||||
|
||||
%% @doc Wait for a full exchange round since `Timestamp'. This means
|
||||
%% that all `{Idx,N}' for all partitions must have exchanged after
|
||||
%% `Timestamp'.
|
||||
-spec wait_for_full_exchange_round([node()], os:now()) -> ok.
|
||||
wait_for_full_exchange_round(Cluster, Timestamp) ->
|
||||
lager:info("wait for full AAE exchange round on cluster ~p", [Cluster]),
|
||||
MoreRecent =
|
||||
fun({_Idx, _, undefined, _RepairStats}) ->
|
||||
false;
|
||||
({_Idx, _, AllExchangedTime, _RepairStats}) ->
|
||||
AllExchangedTime > Timestamp
|
||||
end,
|
||||
AllExchanged =
|
||||
fun(Node) ->
|
||||
Exchanges = rpc:call(Node, yz_kv, compute_exchange_info, []),
|
||||
{_Recent, WaitingFor1} = lists:partition(MoreRecent, Exchanges),
|
||||
WaitingFor2 = [element(1,X) || X <- WaitingFor1],
|
||||
lager:info("Still waiting for AAE of ~p ~p", [Node, WaitingFor2]),
|
||||
[] == WaitingFor2
|
||||
end,
|
||||
wait_until(Cluster, AllExchanged),
|
||||
ok.
|
||||
|
||||
%% @doc Wait for index creation. This is to handle *legacy* versions of yokozuna
|
||||
%% in upgrade tests
|
||||
-spec wait_for_index(list(), index_name()) -> ok.
|
||||
wait_for_index(Cluster, Index) ->
|
||||
IsIndexUp =
|
||||
fun(Node) ->
|
||||
lager:info("Waiting for index ~s to be avaiable on node ~p",
|
||||
[Index, Node]),
|
||||
rpc:call(Node, yz_index, exists, [Index])
|
||||
end,
|
||||
wait_until(Cluster, IsIndexUp),
|
||||
ok.
|
||||
|
||||
%% @see wait_for_schema/3
|
||||
wait_for_schema(Cluster, Name) ->
|
||||
wait_for_schema(Cluster, Name, ignore).
|
||||
|
||||
%% @doc Wait for the schema `Name' to be read by all nodes in
|
||||
%% `Cluster' before returning. If `Content' is binary data when
|
||||
%% verify the schema bytes exactly match `Content'.
|
||||
-spec wait_for_schema([node()], schema_name(), ignore | raw_schema()) -> ok.
|
||||
wait_for_schema(Cluster, Name, Content) ->
|
||||
F = fun(Node) ->
|
||||
lager:info("Attempt to read schema ~s from node ~p",
|
||||
[Name, Node]),
|
||||
{Host, Port} = riak_pb(hd(rt:connection_info([Node]))),
|
||||
{ok, PBConn} = riakc_pb_socket:start_link(Host, Port),
|
||||
R = riakc_pb_socket:get_search_schema(PBConn, Name),
|
||||
riakc_pb_socket:stop(PBConn),
|
||||
case R of
|
||||
{ok, PL} ->
|
||||
case Content of
|
||||
ignore ->
|
||||
Name == proplists:get_value(name, PL);
|
||||
_ ->
|
||||
(Name == proplists:get_value(name, PL)) and
|
||||
(Content == proplists:get_value(content, PL))
|
||||
end;
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end,
|
||||
wait_until(Cluster, F),
|
||||
ok.
|
||||
|
||||
%% @doc Expire YZ trees
|
||||
-spec expire_trees([node()]) -> ok.
|
||||
expire_trees(Cluster) ->
|
||||
lager:info("Expire all trees"),
|
||||
_ = [ok = rpc:call(Node, yz_entropy_mgr, expire_trees, [])
|
||||
|| Node <- Cluster],
|
||||
|
||||
%% The expire is async so just give it a moment
|
||||
timer:sleep(100),
|
||||
ok.
|
||||
|
||||
%% @doc Expire YZ trees
|
||||
-spec clear_trees([node()]) -> ok.
|
||||
clear_trees(Cluster) ->
|
||||
lager:info("Expire all trees"),
|
||||
_ = [ok = rpc:call(Node, yz_entropy_mgr, clear_trees, [])
|
||||
|| Node <- Cluster],
|
||||
ok.
|
||||
|
||||
brutal_kill_remove_index_dirs(Nodes, IndexName, Services) ->
|
||||
IndexDirs = get_index_dirs(IndexName, Nodes),
|
||||
rt:brutal_kill(hd(Nodes)),
|
||||
[rt:stop(ANode) || ANode <- tl(Nodes)],
|
||||
remove_index_dirs2(Nodes, IndexDirs, Services),
|
||||
ok.
|
||||
|
||||
%% @doc Remove index directories, removing the index.
|
||||
-spec remove_index_dirs([node()], index_name(), [atom()]) -> ok.
|
||||
remove_index_dirs(Nodes, IndexName, Services) ->
|
||||
IndexDirs = get_index_dirs(IndexName, Nodes),
|
||||
[rt:stop(ANode) || ANode <- Nodes],
|
||||
remove_index_dirs2(Nodes, IndexDirs, Services),
|
||||
ok.
|
||||
|
||||
remove_index_dirs2(Nodes, IndexDirs, Services) ->
|
||||
lager:info("Remove index dirs: ~p, on nodes: ~p~n",
|
||||
[IndexDirs, Nodes]),
|
||||
[rt:del_dir(binary_to_list(IndexDir)) || IndexDir <- IndexDirs],
|
||||
[start_and_wait(ANode, Services) || ANode <- Nodes].
|
||||
|
||||
get_index_dirs(IndexName, Nodes) ->
|
||||
IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) ||
|
||||
Node <- Nodes],
|
||||
IndexDirs.
|
||||
|
||||
start_and_wait(Node, WaitForServices) ->
|
||||
rt:start(Node),
|
||||
[rt:wait_for_service(Node, Service) || Service <- WaitForServices].
|
||||
|
||||
%% @doc Check if index/core exists in metadata, disk via yz_index:exists.
|
||||
-spec check_exists([node()], index_name()) -> ok.
|
||||
check_exists(Nodes, IndexName) ->
|
||||
wait_until(Nodes,
|
||||
fun(N) ->
|
||||
rpc:call(N, yz_index, exists, [IndexName])
|
||||
end).
|
||||
|
||||
-spec verify_num_found_query([node()], index_name(), count()) -> ok.
|
||||
verify_num_found_query(Cluster, Index, ExpectedCount) ->
|
||||
F = fun(Node) ->
|
||||
Pid = rt:pbc(Node),
|
||||
{ok, {_, _, _, NumFound}} = riakc_pb_socket:search(Pid, Index, <<"*:*">>),
|
||||
lager:info("Check Count, Expected: ~p | Actual: ~p~n",
|
||||
[ExpectedCount, NumFound]),
|
||||
ExpectedCount =:= NumFound
|
||||
end,
|
||||
wait_until(Cluster, F),
|
||||
ok.
|
||||
|
||||
%% @doc Brought over from yz_rt in the yokozuna repo - FORNOW.
|
||||
-spec search_expect(node()|[node()], index_name(), string(), string(),
|
||||
non_neg_integer()) -> ok.
|
||||
search_expect(NodeOrNodes, Index, Name, Term, Expect) ->
|
||||
search_expect(NodeOrNodes, yokozuna, Index, Name, Term, Expect).
|
||||
|
||||
-spec search_expect(node()|[node()], search_type(), index_name(),
|
||||
string(), string(), [string()], non_neg_integer()) -> ok.
|
||||
search_expect(Nodes, solr, Index, Name0, Term0, Shards, Expect)
|
||||
when is_list(Shards), length(Shards) > 0, is_list(Nodes) ->
|
||||
Name = quote_unicode(Name0),
|
||||
Term = quote_unicode(Term0),
|
||||
Node = rt:select_random(Nodes),
|
||||
{Host, Port} = solr_hp(Node, Nodes),
|
||||
URL = internal_solr_url(Host, Port, Index, Name, Term, Shards),
|
||||
lager:info("Run solr search ~s", [URL]),
|
||||
Opts = [{response_format, binary}],
|
||||
F = fun(_) ->
|
||||
{ok, "200", _, R} = ibrowse:send_req(URL, [], get, [], Opts,
|
||||
?IBROWSE_TIMEOUT),
|
||||
verify_count_http(Expect, R)
|
||||
end,
|
||||
wait_until(Nodes, F);
|
||||
search_expect(Node, solr=Type, Index, Name, Term, Shards, Expect)
|
||||
when is_list(Shards), length(Shards) > 0 ->
|
||||
search_expect([Node], Type, Index, Name, Term, Shards, Expect).
|
||||
|
||||
-spec search_expect(node()|[node()], search_type(), index_name(),
|
||||
string(), string(), non_neg_integer()) -> ok.
|
||||
search_expect(Nodes, solr=Type, Index, Name, Term, Expect) when is_list(Nodes) ->
|
||||
Node = rt:select_random(Nodes),
|
||||
HP = solr_hp(Node, Nodes),
|
||||
|
||||
%% F could actually be returned in a shared fun, but w/ so much arity,
|
||||
%% just using it twice makes sense.
|
||||
F = fun(_) ->
|
||||
{ok, "200", _, R} = search(Type, HP, Index, Name, Term),
|
||||
verify_count_http(Expect, R)
|
||||
end,
|
||||
|
||||
wait_until(Nodes, F);
|
||||
search_expect(Nodes, yokozuna=Type, Index, Name, Term, Expect)
|
||||
when is_list(Nodes) ->
|
||||
HP = hd(host_entries(rt:connection_info(Nodes))),
|
||||
|
||||
F = fun(_) ->
|
||||
{ok, "200", _, R} = search(Type, HP, Index, Name, Term),
|
||||
verify_count_http(Expect, R)
|
||||
end,
|
||||
|
||||
wait_until(Nodes, F);
|
||||
search_expect(Node, Type, Index, Name, Term, Expect) ->
|
||||
search_expect([Node], Type, Index, Name, Term, Expect).
|
||||
|
||||
assert_search(Pid, Cluster, Index, Search, SearchExpect, Params) ->
|
||||
F = fun(_) ->
|
||||
lager:info("Searching ~p and asserting it exists",
|
||||
[SearchExpect]),
|
||||
case riakc_pb_socket:search(Pid, Index, Search, Params) of
|
||||
{ok,{search_results,[{_Index,Fields}], _Score, Found}} ->
|
||||
?assert(lists:member(SearchExpect, Fields)),
|
||||
case Found of
|
||||
1 -> true;
|
||||
0 -> false
|
||||
end;
|
||||
{ok, {search_results, [], _Score, 0}} ->
|
||||
lager:info("Search has not yet yielded data"),
|
||||
false
|
||||
end
|
||||
end,
|
||||
wait_until(Cluster, F).
|
||||
|
||||
search(HP, Index, Name, Term) ->
|
||||
search(yokozuna, HP, Index, Name, Term).
|
||||
|
||||
search(Type, {Host, Port}, Index, Name, Term) when is_integer(Port) ->
|
||||
search(Type, {Host, integer_to_list(Port)}, Index, Name, Term);
|
||||
|
||||
search(Type, {Host, Port}, Index, Name0, Term0) ->
|
||||
Name = quote_unicode(Name0),
|
||||
Term = quote_unicode(Term0),
|
||||
FmtStr = case Type of
|
||||
solr ->
|
||||
"http://~s:~s/internal_solr/~s/select?q=~s:~s&wt=json";
|
||||
yokozuna ->
|
||||
"http://~s:~s/search/query/~s?q=~s:~s&wt=json"
|
||||
end,
|
||||
URL = ?FMT(FmtStr, [Host, Port, Index, Name, Term]),
|
||||
lager:info("Run search ~s", [URL]),
|
||||
Opts = [{response_format, binary}],
|
||||
ibrowse:send_req(URL, [], get, [], Opts, ?IBROWSE_TIMEOUT).
|
||||
|
||||
%%%===================================================================
|
||||
%%% Private
|
||||
%%%===================================================================
|
||||
|
||||
-spec verify_count_http(count(), json_string()) -> boolean().
|
||||
verify_count_http(Expected, Resp) ->
|
||||
Count = get_count_http(Resp),
|
||||
lager:info("Expected: ~p, Actual: ~p", [Expected, Count]),
|
||||
Expected =:= Count.
|
||||
|
||||
-spec get_count_http(json_string()) -> count().
|
||||
get_count_http(Resp) ->
|
||||
Struct = mochijson2:decode(Resp),
|
||||
kvc:path([<<"response">>, <<"numFound">>], Struct).
|
||||
|
||||
-spec riak_http({node(), rt:interfaces()} | rt:interfaces()) ->
|
||||
{host(), portnum()}.
|
||||
riak_http({_Node, ConnInfo}) ->
|
||||
riak_http(ConnInfo);
|
||||
riak_http(ConnInfo) ->
|
||||
proplists:get_value(http, ConnInfo).
|
||||
|
||||
-spec riak_pb({node(), rt:interfaces()} | rt:interfaces()) ->
|
||||
{host(), portnum()}.
|
||||
riak_pb({_Node, ConnInfo}) ->
|
||||
riak_pb(ConnInfo);
|
||||
riak_pb(ConnInfo) ->
|
||||
proplists:get_value(pb, ConnInfo).
|
||||
|
||||
-spec create_and_set_index([node()], pid(), bucket(), index_name()) -> ok.
|
||||
create_and_set_index(Cluster, Pid, Bucket, Index) ->
|
||||
%% Create a search index and associate with a bucket
|
||||
lager:info("Create a search index ~s and associate it with bucket ~s",
|
||||
[Index, Bucket]),
|
||||
_ = riakc_pb_socket:create_search_index(Pid, Index),
|
||||
%% For possible legacy upgrade reasons or general check around the cluster,
|
||||
%% wrap create index in a wait
|
||||
wait_for_index(Cluster, Index),
|
||||
set_index(Pid, hd(Cluster), Bucket, Index).
|
||||
-spec create_and_set_index([node()], pid(), bucket(), index_name(),
|
||||
schema_name()) -> ok.
|
||||
create_and_set_index(Cluster, Pid, Bucket, Index, Schema) ->
|
||||
%% Create a search index and associate with a bucket
|
||||
lager:info("Create a search index ~s with a custom schema named ~s and " ++
|
||||
"associate it with bucket ~p", [Index, Schema, Bucket]),
|
||||
_ = riakc_pb_socket:create_search_index(Pid, Index, Schema, []),
|
||||
%% For possible legacy upgrade reasons or general check around the cluster,
|
||||
%% wrap create index in a wait
|
||||
wait_for_index(Cluster, Index),
|
||||
set_index(Pid, hd(Cluster), Bucket, Index).
|
||||
|
||||
-spec set_index(pid(), node(), bucket(), index_name()) -> ok.
|
||||
set_index(_Pid, Node, {BucketType, _Bucket}, Index) ->
|
||||
lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s",
|
||||
[BucketType, Index]),
|
||||
rt:create_and_activate_bucket_type(Node, BucketType, [{search_index, Index}]);
|
||||
set_index(Pid, _Node, Bucket, Index) ->
|
||||
ok = riakc_pb_socket:set_search_index(Pid, Bucket, Index).
|
||||
|
||||
internal_solr_url(Host, Port, Index) ->
|
||||
?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]).
|
||||
internal_solr_url(Host, Port, Index, Name, Term, Shards) ->
|
||||
Ss = [internal_solr_url(Host, ShardPort, Index)
|
||||
|| {_, ShardPort} <- Shards],
|
||||
?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=~s:~s&shards=~s",
|
||||
[Host, Port, Index, Name, Term, string:join(Ss, ",")]).
|
||||
|
||||
quote_unicode(Value) ->
|
||||
mochiweb_util:quote_plus(binary_to_list(
|
||||
unicode:characters_to_binary(Value))).
|
||||
|
||||
-spec commit([node()], index_name()) -> ok.
|
||||
commit(Nodes, Index) ->
|
||||
%% Wait for yokozuna index to trigger, then force a commit
|
||||
timer:sleep(?SOFTCOMMIT),
|
||||
lager:info("Commit search writes to ~s at softcommit (default) ~p",
|
||||
[Index, ?SOFTCOMMIT]),
|
||||
rpc:multicall(Nodes, yz_solr, commit, [Index]),
|
||||
ok.
|
||||
|
||||
-spec drain_solrqs(node() | cluster()) -> ok.
|
||||
drain_solrqs(Cluster) when is_list(Cluster) ->
|
||||
[drain_solrqs(Node) || Node <- Cluster];
|
||||
drain_solrqs(Node) ->
|
||||
rpc:call(Node, yz_solrq_drain_mgr, drain, []),
|
||||
ok.
|
||||
|
||||
-spec override_schema(pid(), [node()], index_name(), schema_name(), string()) ->
|
||||
{ok, [node()]}.
|
||||
override_schema(Pid, Cluster, Index, Schema, RawUpdate) ->
|
||||
lager:info("Overwrite schema with updated schema"),
|
||||
ok = riakc_pb_socket:create_search_schema(Pid, Schema, RawUpdate),
|
||||
yokozuna_rt:wait_for_schema(Cluster, Schema, RawUpdate),
|
||||
[Node|_] = Cluster,
|
||||
{ok, _} = rpc:call(Node, yz_index, reload, [Index]).
|
||||
|
||||
%% @doc Wrapper around `rt:wait_until' to verify `F' against multiple
|
||||
%% nodes. The function `F' is passed one of the `Nodes' as
|
||||
%% argument and must return a `boolean()' delcaring whether the
|
||||
%% success condition has been met or not.
|
||||
-spec wait_until([node()], fun((node()) -> boolean())) -> ok.
|
||||
wait_until(Nodes, F) ->
|
||||
[?assertEqual(ok, rt:wait_until(Node, F)) || Node <- Nodes],
|
||||
ok.
|
||||
|
||||
-spec solr_hp(node(), [node()]) -> {host(), portnum()}.
|
||||
solr_hp(Node, Cluster) ->
|
||||
CI = connection_info(Cluster),
|
||||
solr_http(proplists:get_value(Node, CI)).
|
||||
|
||||
-spec connection_info(list()) -> orddict:orddict().
|
||||
connection_info(Cluster) ->
|
||||
CI = orddict:from_list(rt:connection_info(Cluster)),
|
||||
SolrInfo = orddict:from_list([{Node, [{solr_http, get_yz_conn_info(Node)}]}
|
||||
|| Node <- Cluster]),
|
||||
orddict:merge(fun(_,V1,V2) -> V1 ++ V2 end, CI, SolrInfo).
|
||||
|
||||
-spec solr_http({node(), orddict:orddict()}) -> {host(), portnum()}.
|
||||
solr_http({_Node, ConnInfo}) ->
|
||||
solr_http(ConnInfo);
|
||||
solr_http(ConnInfo) ->
|
||||
proplists:get_value(solr_http, ConnInfo).
|
||||
|
||||
-spec get_yz_conn_info(node()) -> {string(), string()}.
|
||||
get_yz_conn_info(Node) ->
|
||||
{ok, SolrPort} = rpc:call(Node, application, get_env, [yokozuna, solr_port]),
|
||||
%% Currently Yokozuna hardcodes listener to all interfaces
|
||||
{"127.0.0.1", SolrPort}.
|
||||
|
||||
-spec http(method(), string(), list(), binary()|[]) -> response().
|
||||
http(Method, URL, Headers, Body) ->
|
||||
Opts = [],
|
||||
ibrowse:send_req(URL, Headers, Method, Body, Opts, ?IBROWSE_TIMEOUT).
|
||||
|
||||
-spec http(method(), string(), list(), binary()|[], list()|timeout())
|
||||
-> response().
|
||||
http(Method, URL, Headers, Body, Opts) when is_list(Opts) ->
|
||||
ibrowse:send_req(URL, Headers, Method, Body, Opts, ?IBROWSE_TIMEOUT);
|
||||
http(Method, URL, Headers, Body, Timeout) when is_integer(Timeout) ->
|
||||
Opts = [],
|
||||
ibrowse:send_req(URL, Headers, Method, Body, Opts, Timeout).
|
||||
|
||||
-spec http(method(), string(), list(), binary()|[], list(), timeout())
|
||||
-> response().
|
||||
http(Method, URL, Headers, Body, Opts, Timeout) when
|
||||
is_list(Opts) andalso is_integer(Timeout) ->
|
||||
ibrowse:send_req(URL, Headers, Method, Body, Opts, Timeout).
|
||||
|
||||
-spec create_indexed_bucket_type(cluster(), binary(), index_name()) -> ok.
|
||||
create_indexed_bucket_type(Cluster, BucketType, IndexName) ->
|
||||
ok = create_index(Cluster, IndexName),
|
||||
ok = create_bucket_type(Cluster, BucketType, [{search_index, IndexName}]).
|
||||
|
||||
-spec create_indexed_bucket_type(cluster(), binary(), index_name(),
|
||||
schema_name()) -> ok.
|
||||
create_indexed_bucket_type(Cluster, BucketType, IndexName, SchemaName) ->
|
||||
ok = create_index(Cluster, IndexName, SchemaName),
|
||||
ok = create_bucket_type(Cluster, BucketType, [{search_index, IndexName}]).
|
||||
|
||||
-spec create_index(cluster(), index_name()) -> ok.
|
||||
create_index(Cluster, Index) ->
|
||||
Node = select_random(Cluster),
|
||||
lager:info("Creating index ~s [~p]", [Index, Node]),
|
||||
rpc:call(Node, yz_index, create, [Index]),
|
||||
ok = wait_for_index(Cluster, Index).
|
||||
|
||||
-spec create_index(cluster(), index_name(), schema_name()) -> ok.
|
||||
create_index(Cluster, Index, SchemaName) ->
|
||||
Node = select_random(Cluster),
|
||||
lager:info("Creating index ~s with schema ~s [~p]",
|
||||
[Index, SchemaName, Node]),
|
||||
rpc:call(Node, yz_index, create, [Index, SchemaName]),
|
||||
ok = wait_for_index(Cluster, Index).
|
||||
|
||||
select_random(List) ->
|
||||
Length = length(List),
|
||||
Idx = random:uniform(Length),
|
||||
lists:nth(Idx, List).
|
||||
|
||||
-spec create_bucket_type(cluster(), binary(), [term()]) -> ok.
|
||||
create_bucket_type(Cluster, BucketType, Props) ->
|
||||
Node = select_random(Cluster),
|
||||
rt:create_and_activate_bucket_type(Node, BucketType, Props),
|
||||
rt:wait_until_bucket_type_status(BucketType, active, Node),
|
||||
rt:wait_until_bucket_type_visible(Cluster, BucketType).
|
@ -1,170 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%-------------------------------------------------------------------
|
||||
-module(yz_core_properties_create_unload).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(CFG, [{riak_kv,
|
||||
[
|
||||
%% allow AAE to build trees and exchange rapidly
|
||||
{anti_entropy_build_limit, {1000, 1000}},
|
||||
{anti_entropy_concurrency, 64},
|
||||
{anti_entropy_tick, 1000}
|
||||
]},
|
||||
{yokozuna,
|
||||
[
|
||||
{enabled, true},
|
||||
{anti_entropy_tick, 1000}
|
||||
]}]).
|
||||
-define(INDEX, <<"test_idx_core">>).
|
||||
-define(TYPE, <<"data">>).
|
||||
-define(BUCKET, {?TYPE, <<"test_bkt_core">>}).
|
||||
-define(SEQMAX, 100).
|
||||
|
||||
confirm() ->
|
||||
Cluster = rt:build_cluster(4, ?CFG),
|
||||
rt:wait_for_cluster_service(Cluster, yokozuna),
|
||||
|
||||
%% Generate keys, YZ only supports UTF-8 compatible keys
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1, ?SEQMAX),
|
||||
not lists:any(fun(E) -> E > 127 end,
|
||||
binary_to_list(<<N:64/integer>>))],
|
||||
KeyCount = length(Keys),
|
||||
|
||||
%% Randomly select a subset of the test nodes to remove
|
||||
%% core.properties from
|
||||
RandNodes = rt:random_sublist(Cluster, 3),
|
||||
|
||||
%% Select one of the modified nodes as a client endpoint
|
||||
Node = rt:select_random(RandNodes),
|
||||
Pid = rt:pbc(Node),
|
||||
riakc_pb_socket:set_options(Pid, [queue_if_disconnected]),
|
||||
|
||||
%% Create a search index and associate with a bucket
|
||||
lager:info("Create and set Index ~p for Bucket ~p~n", [?INDEX, ?BUCKET]),
|
||||
_ = riakc_pb_socket:create_search_index(Pid, ?INDEX),
|
||||
yokozuna_rt:wait_for_index(Cluster, ?INDEX),
|
||||
|
||||
ok = rt:create_and_activate_bucket_type(Node,
|
||||
?TYPE,
|
||||
[{search_index, ?INDEX}]),
|
||||
|
||||
rt:wait_until_bucket_type_visible(Cluster, ?TYPE),
|
||||
|
||||
%% Write keys and wait for soft commit
|
||||
lager:info("Writing ~p keys", [KeyCount]),
|
||||
[ok = rt:pbc_write(Pid, ?BUCKET, Key, Key, "text/plain") || Key <- Keys],
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount),
|
||||
|
||||
|
||||
test_core_props_removal(Cluster, RandNodes, KeyCount, Pid),
|
||||
test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid),
|
||||
test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid),
|
||||
test_brutal_kill_and_delete_index_dirs(Cluster, RandNodes, KeyCount, Pid),
|
||||
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
pass.
|
||||
|
||||
test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) ->
|
||||
lager:info("Remove core.properties file in each index data dir"),
|
||||
remove_core_props(RandNodes, ?INDEX),
|
||||
|
||||
yokozuna_rt:check_exists(Cluster, ?INDEX),
|
||||
|
||||
lager:info("Write one more piece of data"),
|
||||
ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount+1).
|
||||
|
||||
test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid) ->
|
||||
lager:info("Remove index directories on each node and let them recreate/reindex"),
|
||||
yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX, [riak_kv, yokozuna]),
|
||||
|
||||
yokozuna_rt:check_exists(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_aae(Cluster),
|
||||
|
||||
lager:info("Write second piece of data"),
|
||||
ok = rt:pbc_write(Pid, ?BUCKET, <<"food">>, <<"foody">>, "text/plain"),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount+2).
|
||||
|
||||
test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid) ->
|
||||
lager:info("Remove segment info files in each index data dir"),
|
||||
remove_segment_infos(RandNodes, ?INDEX),
|
||||
|
||||
lager:info("To fix, we remove index directories on each node and let them recreate/reindex"),
|
||||
|
||||
yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX, [riak_kv, yokozuna]),
|
||||
|
||||
yokozuna_rt:check_exists(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_aae(Cluster),
|
||||
|
||||
lager:info("Write third piece of data"),
|
||||
ok = rt:pbc_write(Pid, ?BUCKET, <<"baz">>, <<"bar">>, "text/plain"),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount+3).
|
||||
|
||||
test_brutal_kill_and_delete_index_dirs(Cluster, RandNodes, KeyCount, Pid) ->
|
||||
lager:info("Remove index directories on each node and let them recreate/reindex"),
|
||||
yokozuna_rt:brutal_kill_remove_index_dirs(RandNodes, ?INDEX, [riak_kv, yokozuna]),
|
||||
|
||||
yokozuna_rt:check_exists(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_aae(Cluster),
|
||||
|
||||
lager:info("Write fourth piece of data"),
|
||||
ok = rt:pbc_write(Pid, ?BUCKET, <<"food">>, <<"foody">>, "text/plain"),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 4).
|
||||
|
||||
%% @doc Remove core properties file on nodes.
|
||||
remove_core_props(Nodes, IndexName) ->
|
||||
IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) ||
|
||||
Node <- Nodes],
|
||||
PropsFiles = [filename:join([IndexDir, "core.properties"]) ||
|
||||
IndexDir <- IndexDirs],
|
||||
lager:info("Remove core.properties files: ~p, on nodes: ~p~n",
|
||||
[PropsFiles, Nodes]),
|
||||
[file:delete(PropsFile) || PropsFile <- PropsFiles],
|
||||
ok.
|
||||
|
||||
%% @doc Remove lucence segment info files to check if reindexing will occur
|
||||
%% on re-creation/re-indexing.
|
||||
remove_segment_infos(Nodes, IndexName) ->
|
||||
IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) ||
|
||||
Node <- Nodes],
|
||||
SiPaths = [binary_to_list(filename:join([IndexDir, "data/index/*.si"])) ||
|
||||
IndexDir <- IndexDirs],
|
||||
SiFiles = lists:append([filelib:wildcard(Path) || Path <- SiPaths]),
|
||||
lager:info("Remove segment info files: ~p, on in dirs: ~p~n",
|
||||
[SiFiles, IndexDirs]),
|
||||
[file:delete(SiFile) || SiFile <- SiFiles].
|
@ -1,564 +0,0 @@
|
||||
-module(yz_crdt).
|
||||
|
||||
-compile(export_all).
|
||||
-compile({parse_transform, rt_intercept_pt}).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(HARNESS, (rt_config:get(rt_harness))).
|
||||
-define(INDEX, <<"maps">>).
|
||||
-define(TYPE, <<"maps">>).
|
||||
-define(KEY, <<"Chris Meiklejohn">>).
|
||||
-define(BUCKET, {?TYPE, <<"testbucket">>}).
|
||||
-define(GET(K,L), proplists:get_value(K, L)).
|
||||
-define(N, 3).
|
||||
|
||||
-define(CONF,
|
||||
[
|
||||
{riak_core,
|
||||
[{ring_creation_size, 8}]
|
||||
},
|
||||
{riak_kv,
|
||||
[{delete_mode, keep},
|
||||
{anti_entropy_build_limit, {100, 1000}},
|
||||
{anti_entropy_concurrency, 8},
|
||||
{anti_entropy_tick, 1000}]},
|
||||
{yokozuna,
|
||||
[{enabled, true}]
|
||||
}]).
|
||||
|
||||
confirm() ->
|
||||
rt:set_advanced_conf(all, ?CONF),
|
||||
|
||||
%% Configure cluster.
|
||||
Nodes = rt:build_cluster(5, ?CONF),
|
||||
|
||||
Node = rt:select_random(Nodes),
|
||||
|
||||
%% Create PB connection.
|
||||
Pid = rt:pbc(Node),
|
||||
riakc_pb_socket:set_options(Pid, [queue_if_disconnected]),
|
||||
|
||||
%% Create index.
|
||||
riakc_pb_socket:create_search_index(Pid, ?INDEX, <<"_yz_default">>, []),
|
||||
|
||||
%% Create bucket type for maps.
|
||||
rt:create_and_activate_bucket_type(Node,
|
||||
?TYPE,
|
||||
[{datatype, map},
|
||||
{n_val, ?N},
|
||||
{search_index, ?INDEX}]),
|
||||
|
||||
lager:info("Write some sample data"),
|
||||
test_sample_data(Pid, Nodes, ?BUCKET, ?KEY, ?INDEX),
|
||||
lager:info("Search and Validate our CRDT writes/updates."),
|
||||
ok = rt:wait_until(fun() -> validate_sample_data(Pid, ?KEY, ?INDEX)
|
||||
end),
|
||||
|
||||
lager:info("Test setting the register of a map twice to different values."
|
||||
" (The # of results should still be 1)"),
|
||||
test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
|
||||
ok = rt:wait_until(fun() -> validate_test_repeat_set(Pid, ?INDEX)
|
||||
end),
|
||||
|
||||
lager:info("FYI: delete_mode is on keep here to make sure YZ handles"
|
||||
" deletes correctly throughout."),
|
||||
lager:info("Test varying deletes operations"),
|
||||
test_and_validate_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY),
|
||||
|
||||
lager:info("Test to make sure yz AAE handles deletes/removes correctly"),
|
||||
test_and_validate_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX),
|
||||
|
||||
lager:info("Test to make sure siblings don't exist after partition"),
|
||||
test_siblings(Nodes, ?BUCKET, ?INDEX),
|
||||
lager:info("Verify counts and operations after heal + transfers + commits"),
|
||||
ok = rt:wait_until(fun() -> validate_test_siblings(Pid, ?BUCKET, ?INDEX)
|
||||
end),
|
||||
|
||||
%% Stop PB connection
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
pass.
|
||||
|
||||
test_sample_data(Pid, Cluster, Bucket, Key, Index) ->
|
||||
Map1 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(Key, R)
|
||||
end, riakc_map:new()),
|
||||
Map2 = riakc_map:update(
|
||||
{<<"interests">>, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"thing">>, S) end,
|
||||
Map1),
|
||||
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(Map2)),
|
||||
|
||||
drain_and_commit(Cluster, Index).
|
||||
|
||||
%% @doc Test setting the register of a map twice to different values.
|
||||
%% The # of results should still be 1.
|
||||
test_repeat_sets(Pid, Cluster, Bucket, Index, Key) ->
|
||||
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
|
||||
M2 = riakc_map:update(
|
||||
{<<"update">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"foo">>, R)
|
||||
end, M1),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(M2)),
|
||||
M3 = riakc_map:update(
|
||||
{<<"update">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"bar">>, R)
|
||||
end, M1),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
drain_and_commit(Cluster, Index).
|
||||
|
||||
%% @doc Tests varying deletes of within a CRDT map and checks for correct counts
|
||||
%% - Remove registers, remove and add elements within a set
|
||||
%% - Delete the map (associated w/ a key)
|
||||
%% - Recreate objects in the map and delete the map again
|
||||
test_and_validate_delete(Pid, Cluster, Bucket, Index, Key) ->
|
||||
{ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
|
||||
|
||||
lager:info("Remove register from map"),
|
||||
M2 = riakc_map:erase({<<"name">>, register}, M1),
|
||||
|
||||
lager:info("Delete element from set (in map) & Add element to set"),
|
||||
M3 = riakc_map:update(
|
||||
{<<"interests">>, set},
|
||||
fun(S) ->
|
||||
riakc_set:del_element(<<"thing">>,
|
||||
riakc_set:add_element(<<"roses">>, S))
|
||||
end, M2),
|
||||
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Search deleted/erased name_register:*"),
|
||||
search_and_validate_found(Pid, Index, <<"name_register:*">>, 0),
|
||||
|
||||
lager:info("Add another element to set (in map)"),
|
||||
M4 = riakc_map:update(
|
||||
{<<"interests">>, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"pans">>, S)
|
||||
end, M3),
|
||||
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(M4)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Search deleted interests_set:thing*"),
|
||||
search_and_validate_found(Pid, Index, <<"interests_set:thing*">>, 0),
|
||||
|
||||
lager:info("Delete key for map"),
|
||||
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
?assertEqual({error, {notfound, map}},
|
||||
riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
|
||||
|
||||
lager:info("Search deleted map: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 0),
|
||||
|
||||
lager:info("Recreate object and check counts..."),
|
||||
|
||||
lager:info("Set a new register for map"),
|
||||
M5 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"hello">>, R)
|
||||
end, riakc_map:new()),
|
||||
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key,
|
||||
riakc_map:to_op(M5)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
{ok, M6} = riakc_pb_socket:fetch_type(Pid, Bucket, Key),
|
||||
Keys = riakc_map:fetch_keys(M6),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"name">>, register}, M6)),
|
||||
|
||||
lager:info("Search recreated map: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 1),
|
||||
|
||||
lager:info("Delete key for map again"),
|
||||
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
?assertEqual({error, {notfound, map}},
|
||||
riakc_pb_socket:fetch_type(Pid, Bucket, Key)),
|
||||
|
||||
lager:info("Search ~p deleted map: *:*", [Key]),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 0).
|
||||
|
||||
%% @doc Tests key/map delete and AAE
|
||||
%% - Use intercept to trap yz_kv:delete_operation to skip over
|
||||
%% - Makes sure that yz AAE handles tombstone on expire/exchange
|
||||
%% - Recreate objects and check
|
||||
test_and_validate_delete_aae(Pid, Cluster, Bucket, Index) ->
|
||||
Key1 = <<"ohyokozuna">>,
|
||||
M1 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"jokes are">>, R)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M1)),
|
||||
|
||||
Key2 = <<"ohriaksearch">>,
|
||||
M2 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"better explained">>, R)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M2)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Add and load handle_delete_operation intercept"),
|
||||
|
||||
[make_intercepts_tab(ANode) || ANode <- Cluster],
|
||||
|
||||
[rt_intercept:add(ANode, {yz_solrq_helper, [{{get_ops_for_no_sibling_deletes, 3},
|
||||
handle_get_ops_for_no_sibling_deletes}]})
|
||||
|| ANode <- Cluster],
|
||||
[true = rpc:call(ANode, ets, insert, [intercepts_tab, {del_put, 0}]) ||
|
||||
ANode <- Cluster],
|
||||
[rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster],
|
||||
|
||||
lager:info("Delete key ~p for map", [Key2]),
|
||||
?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key2)),
|
||||
?assertEqual({error, {notfound, map}},
|
||||
riakc_pb_socket:fetch_type(Pid, Bucket, Key2)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Search all results, expect extra b/c tombstone"
|
||||
" and we've modified the delete op : *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 2),
|
||||
|
||||
lager:info("Expire and re-check"),
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
lager:info("Search all results, expect removed tombstone b/c AAE"
|
||||
" should clean it up: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 1),
|
||||
|
||||
lager:info("Recreate object and check counts"),
|
||||
|
||||
M3 = riakc_map:update(
|
||||
{<<"name">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"hello again, is it me you're"
|
||||
"looking for">>, R)
|
||||
end, riakc_map:new()),
|
||||
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
drain_and_commit(Cluster, Index),
|
||||
|
||||
{ok, M4} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
|
||||
Keys = riakc_map:fetch_keys(M4),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"name">>, register}, M4)),
|
||||
|
||||
lager:info("Search recreated map: *:*"),
|
||||
search_and_validate_found(Pid, Index, <<"*:*">>, 2).
|
||||
|
||||
%% @doc Tests sibling handling/merge when there's a partition
|
||||
%% - Write/remove from separate partitions
|
||||
%% - Verify counts and that CRDTs have no siblings, vtags,
|
||||
%% after healing partitions. The CRDT map merges so the search
|
||||
%% results be consistent.
|
||||
test_siblings(Cluster, Bucket, Index) ->
|
||||
Key1 = <<"Movies">>,
|
||||
Key2 = <<"Games">>,
|
||||
Set1 = <<"directors">>,
|
||||
Set2 = <<"characters">>,
|
||||
|
||||
%% make siblings
|
||||
{P1, P2} = lists:split(1, Cluster),
|
||||
|
||||
%% Create an object in Partition 1 and siblings in Partition 2
|
||||
lager:info("Create partition: ~p | ~p", [P1, P2]),
|
||||
Partition = rt:partition(P1, P2),
|
||||
|
||||
%% PB connections for accessing each side
|
||||
Pid1 = rt:pbc(hd(P1)),
|
||||
Pid2 = rt:pbc(hd(P2)),
|
||||
|
||||
riakc_pb_socket:set_options(Pid1, [queue_if_disconnected, auto_reconnect]),
|
||||
riakc_pb_socket:set_options(Pid2, [queue_if_disconnected, auto_reconnect]),
|
||||
|
||||
%% P1 writes
|
||||
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Kubrick">>]),
|
||||
M1 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Kubrick">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid1,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M1)),
|
||||
|
||||
lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Demme">>]),
|
||||
M2 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Demme">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid1,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M2)),
|
||||
|
||||
%% P2 Siblings
|
||||
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
|
||||
[Key2, <<"Sonic">>]),
|
||||
M3 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Sonic">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M3)),
|
||||
|
||||
lager:info("Delete key from Partition 2: Key ~p", [Key2]),
|
||||
ok = riakc_pb_socket:delete(Pid2, Bucket, Key2),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char"
|
||||
" ~p", [Key2, <<"Crash">>]),
|
||||
M4 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Crash">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M4)),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p",
|
||||
[Key2, <<"Mario">>]),
|
||||
M5 = riakc_map:update(
|
||||
{Set2, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Mario">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key2,
|
||||
riakc_map:to_op(M5)),
|
||||
|
||||
lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p",
|
||||
[Key1, <<"Klimov">>]),
|
||||
M6 = riakc_map:update(
|
||||
{Set1, set},
|
||||
fun(S) ->
|
||||
riakc_set:add_element(<<"Klimov">>, S)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid2,
|
||||
Bucket,
|
||||
Key1,
|
||||
riakc_map:to_op(M6)),
|
||||
|
||||
rt:heal(Partition),
|
||||
rt:wait_until_transfers_complete(Cluster),
|
||||
|
||||
drain_and_commit(Cluster, Index).
|
||||
|
||||
validate_sample_data(Pid, Key, Index) ->
|
||||
try
|
||||
Thing = <<"thing">>,
|
||||
|
||||
{ok, {search_results, Results1a, _, Found1}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"name_register:Chris*">>),
|
||||
?assertEqual(1, Found1),
|
||||
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results1a)),
|
||||
Key),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results1a)),
|
||||
Thing),
|
||||
|
||||
{ok, {search_results, Results2a, _, Found2}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"interests_set:thing*">>),
|
||||
?assertEqual(1, Found2),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results2a)),
|
||||
Key),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results2a)),
|
||||
Thing),
|
||||
|
||||
{ok, {search_results, Results3a, _, Found3}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(1, Found3),
|
||||
?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results3a)),
|
||||
Key),
|
||||
?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results3a)),
|
||||
Thing),
|
||||
|
||||
%% Redo queries and check if results are equal
|
||||
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"name_register:Chris*">>),
|
||||
?assertEqual(number_of_fields(Results1a, Index),
|
||||
number_of_fields(Results1b, Index)),
|
||||
|
||||
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"interests_set:thing*">>),
|
||||
?assertEqual(number_of_fields(Results2a, Index),
|
||||
number_of_fields(Results2b, Index)),
|
||||
|
||||
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index, <<"_yz_rb:testbucket">>),
|
||||
?assertEqual(number_of_fields(Results3a, Index),
|
||||
number_of_fields(Results3b, Index)),
|
||||
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error"
|
||||
" was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
validate_test_repeat_set(Pid, Index) ->
|
||||
try
|
||||
{ok, {search_results, _R, _, Found}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"update_register:*">>),
|
||||
?assertEqual(1, Found),
|
||||
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error"
|
||||
" was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
validate_test_siblings(Pid, Bucket, Index) ->
|
||||
try
|
||||
Key1 = <<"Movies">>,
|
||||
Key2 = <<"Games">>,
|
||||
|
||||
{ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1),
|
||||
Keys = riakc_map:fetch_keys(MF1),
|
||||
?assertEqual(1, length(Keys)),
|
||||
?assert(riakc_map:is_key({<<"directors">>, set}, MF1)),
|
||||
{ok, {search_results, Results1, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"directors_set:*">>),
|
||||
lager:info("Search movies map directors_set:*: ~p~n", [Results1]),
|
||||
?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>,
|
||||
?GET(?INDEX, Results1)))),
|
||||
|
||||
{ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2),
|
||||
Keys2 = riakc_map:fetch_keys(MF2),
|
||||
?assertEqual(1, length(Keys2)),
|
||||
?assert(riakc_map:is_key({<<"characters">>, set}, MF2)),
|
||||
{ok, {search_results, Results2, _, _}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"characters_set:*">>),
|
||||
lager:info("Search games map characters_set:*: ~p~n", [Results2]),
|
||||
?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>,
|
||||
?GET(?INDEX, Results2)))),
|
||||
|
||||
{ok, {search_results, Results3, _, Found}} = riakc_pb_socket:search(
|
||||
Pid, Index,
|
||||
<<"_yz_vtag:*">>),
|
||||
lager:info("Search vtags in search *:*: ~p~n", [Results3]),
|
||||
?assertEqual(0, Found),
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info("Waiting for CRDT search results to converge. Error"
|
||||
" was ~p.", [{Err, Reason}]),
|
||||
false
|
||||
end.
|
||||
|
||||
%% @private
|
||||
drain_and_commit(Cluster, Index) ->
|
||||
yokozuna_rt:drain_solrqs(Cluster),
|
||||
yokozuna_rt:commit(Cluster, Index).
|
||||
|
||||
%% @private
|
||||
number_of_fields(Resp, Index) ->
|
||||
length(?GET(Index, Resp)).
|
||||
|
||||
%% @private
|
||||
make_intercepts_tab(Node) ->
|
||||
SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]),
|
||||
intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table,
|
||||
public, set, {heir, SupPid, {}}]]).
|
||||
|
||||
%% @private
|
||||
search_and_validate_found(Pid, Index, Search, ExpectedCount) ->
|
||||
ok = rt:wait_until(
|
||||
fun() ->
|
||||
try
|
||||
{ok, {search_results, Results2, _, F}} =
|
||||
riakc_pb_socket:search(Pid, Index, Search),
|
||||
?assertEqual(ExpectedCount, F),
|
||||
true
|
||||
catch Err:Reason ->
|
||||
lager:info(
|
||||
"Waiting for CRDT search results to converge."
|
||||
" Index: ~p"
|
||||
" Search: ~p"
|
||||
" Error: ~p",
|
||||
[Index, Search, {Err, Reason}]
|
||||
),
|
||||
false
|
||||
end
|
||||
end).
|
||||
|
@ -1,98 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2015 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Test that checks to make sure that default bucket_types
|
||||
%% do not lose data when expiring/clearing AAE trees when
|
||||
%% trees are rebuilt for comparison.
|
||||
%% @end
|
||||
|
||||
|
||||
-module(yz_default_bucket_type_upgrade).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("riakc/include/riakc.hrl").
|
||||
|
||||
-define(N, 3).
|
||||
-define(YZ_CAP, {yokozuna, handle_legacy_default_bucket_type_aae}).
|
||||
-define(INDEX, <<"test_upgrade_idx">>).
|
||||
-define(BUCKET, <<"test_upgrade_bucket">>).
|
||||
-define(SEQMAX, 2000).
|
||||
-define(CFG,
|
||||
[{riak_core,
|
||||
[
|
||||
{ring_creation_size, 16},
|
||||
{default_bucket_props,
|
||||
[
|
||||
{n_val, ?N},
|
||||
{allow_mult, true},
|
||||
{dvv_enabled, true}
|
||||
]}
|
||||
]},
|
||||
{riak_kv,
|
||||
[
|
||||
{anti_entropy_build_limit, {100, 1000}},
|
||||
{anti_entropy_concurrency, 8}
|
||||
]
|
||||
},
|
||||
{yokozuna,
|
||||
[
|
||||
{anti_entropy_tick, 1000},
|
||||
{enabled, true}
|
||||
]}
|
||||
]).
|
||||
|
||||
confirm() ->
|
||||
%% This test explicitly requires an upgrade from 2.0.5 to test a
|
||||
%% new capability
|
||||
OldVsn = "2.0.5",
|
||||
|
||||
[_, Node|_] = Cluster = rt:build_cluster(lists:duplicate(4, {OldVsn, ?CFG})),
|
||||
rt:wait_for_cluster_service(Cluster, yokozuna),
|
||||
|
||||
[rt:assert_capability(ANode, ?YZ_CAP, {unknown_capability, ?YZ_CAP}) || ANode <- Cluster],
|
||||
|
||||
GenKeys = yokozuna_rt:gen_keys(?SEQMAX),
|
||||
KeyCount = length(GenKeys),
|
||||
lager:info("KeyCount ~p", [KeyCount]),
|
||||
|
||||
OldPid = rt:pbc(Node),
|
||||
|
||||
yokozuna_rt:write_data(Cluster, OldPid, ?INDEX, ?BUCKET, GenKeys),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount),
|
||||
|
||||
%% Upgrade
|
||||
yokozuna_rt:rolling_upgrade(Cluster, current),
|
||||
|
||||
[rt:assert_capability(ANode, ?YZ_CAP, v1) || ANode <- Cluster],
|
||||
[rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [v1, v0]) || ANode <- Cluster],
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount),
|
||||
|
||||
lager:info("Write one more piece of data"),
|
||||
Pid = rt:pbc(Node),
|
||||
ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 1),
|
||||
|
||||
pass.
|
@ -1,117 +0,0 @@
|
||||
-module(yz_ensemble).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(CFG,
|
||||
[
|
||||
{riak_core,
|
||||
[
|
||||
{ring_creation_size, 8}
|
||||
]},
|
||||
{yokozuna,
|
||||
[
|
||||
{enabled, true}
|
||||
]}
|
||||
]).
|
||||
|
||||
confirm() ->
|
||||
NumNodes = 3,
|
||||
NVal = 3,
|
||||
ConfigB = ensemble_util:fast_config(NVal),
|
||||
Config = ConfigB ++ [{yokozuna, [{enabled, true}]}],
|
||||
lager:info("Building cluster and waiting for ensemble to stablize"),
|
||||
Nodes = build_cluster_with_yz_support(NumNodes, Config, NVal),
|
||||
rt:wait_for_cluster_service(Nodes, yokozuna),
|
||||
vnode_util:load(Nodes),
|
||||
Node = hd(Nodes),
|
||||
|
||||
lager:info("Creating/activating 'strong' bucket type"),
|
||||
rt:create_and_activate_bucket_type(Node, <<"strong">>,
|
||||
[{consistent, true}, {n_val, NVal}]),
|
||||
|
||||
Bucket = {<<"strong">>, <<"test">>},
|
||||
Index = <<"testi">>,
|
||||
create_index(Node, Index),
|
||||
set_bucket_props(Node, Bucket, Index),
|
||||
|
||||
verify_ensemble_delete_support(Nodes, Bucket, Index),
|
||||
|
||||
pass.
|
||||
|
||||
|
||||
%% @private
|
||||
%% @doc Populates then deletes from SC bucket
|
||||
verify_ensemble_delete_support(Cluster, Bucket, Index) ->
|
||||
%% Yz only supports UTF-8 compatible keys
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1,2000),
|
||||
not lists:any(fun(E) -> E > 127 end,binary_to_list(<<N:64/integer>>))],
|
||||
|
||||
PBC = rt:pbc(hd(Cluster)),
|
||||
|
||||
lager:info("Writing ~p keys", [length(Keys)]),
|
||||
[ok = rt:pbc_write(PBC, Bucket, Key, Key, "text/plain") || Key <- Keys],
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
%% soft commit wait, then check that last key is indexed
|
||||
lager:info("Search for keys to verify they exist"),
|
||||
LKey = lists:last(Keys),
|
||||
rt:wait_until(fun() ->
|
||||
{M, _} = riakc_pb_socket:search(PBC, Index, query_value(LKey)),
|
||||
ok == M
|
||||
end),
|
||||
[{ok, _} =
|
||||
riakc_pb_socket:search(PBC, Index, query_value(Key)) || Key <- Keys],
|
||||
|
||||
lager:info("Deleting keys"),
|
||||
[riakc_pb_socket:delete(PBC, Bucket, Key) || Key <- Keys],
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
rt:wait_until(fun() ->
|
||||
case riakc_pb_socket:search(PBC, Index, query_value(LKey)) of
|
||||
{ok,{search_results,Res,_,_}} ->
|
||||
lager:info("RES: ~p ~p~n", [Res, LKey]),
|
||||
Res == [];
|
||||
S ->
|
||||
lager:info("OTHER: ~p ~p~n", [S, LKey]),
|
||||
false
|
||||
end
|
||||
end),
|
||||
[ {ok,{search_results,[],_,_}} =
|
||||
riakc_pb_socket:search(PBC, Index, query_value(Key)) || Key <- Keys],
|
||||
|
||||
ok.
|
||||
|
||||
|
||||
%% @private
|
||||
%% @doc build a cluster from ensemble_util + yz support
|
||||
%%
|
||||
%% NOTE: There's a timing issue that causes join_cluster to hang the r_t
|
||||
%% node when adding yokozuna and ensemble support. Waiting for yokozuna
|
||||
%% to load on each node allows join_cluster to complete consistently
|
||||
build_cluster_with_yz_support(Num, Config, NVal) ->
|
||||
Nodes = rt:deploy_nodes(Num, Config),
|
||||
[rt:wait_for_cluster_service([N], yokozuna) || N <- Nodes],
|
||||
Node = hd(Nodes),
|
||||
rt:join_cluster(Nodes),
|
||||
ensemble_util:wait_until_cluster(Nodes),
|
||||
ensemble_util:wait_for_membership(Node),
|
||||
ensemble_util:wait_until_stable(Node, NVal),
|
||||
Nodes.
|
||||
|
||||
%% @private
|
||||
%% @doc Builds a simple riak key query
|
||||
query_value(Value) ->
|
||||
V2 = iolist_to_binary(re:replace(Value, "\"", "%22")),
|
||||
V3 = iolist_to_binary(re:replace(V2, "\\\\", "%5C")),
|
||||
<<"_yz_rk:\"",V3/binary,"\"">>.
|
||||
|
||||
%% pulled from yz_rt
|
||||
|
||||
%% @private
|
||||
create_index(Node, Index) ->
|
||||
lager:info("Creating index ~s [~p]", [Index, Node]),
|
||||
ok = rpc:call(Node, yz_index, create, [Index]).
|
||||
|
||||
%% @private
|
||||
set_bucket_props(Node, Bucket, Index) ->
|
||||
Props = [{search_index, Index}],
|
||||
rpc:call(Node, riak_core_bucket, set_bucket, [Bucket, Props]).
|
@ -1,441 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2015 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%-------------------------------------------------------------------
|
||||
|
||||
%% @doc Test that checks if we're caching the extractor map and that
|
||||
%% creating custom extractors is doable via protobufs.
|
||||
%% @end
|
||||
|
||||
-module(yz_extractors).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("riakc/include/riakc.hrl").
|
||||
|
||||
-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
|
||||
-define(TYPE1, <<"extractors_in_paradise">>).
|
||||
-define(TYPE2, <<"extractors_in_paradiso">>).
|
||||
-define(INDEX1, <<"test_idx1">>).
|
||||
-define(BUCKET1, {?TYPE1, <<"test_bkt1">>}).
|
||||
-define(INDEX2, <<"test_idx2">>).
|
||||
-define(BUCKET2, {?TYPE2, <<"test_bkt2">>}).
|
||||
-define(TYPE3, <<"type3">>).
|
||||
-define(BUCKET3, {?TYPE3, <<"test_bkt3">>}).
|
||||
-define(INDEX3, <<"test_idx3">>).
|
||||
-define(SCHEMANAME, <<"test">>).
|
||||
-define(TEST_SCHEMA,
|
||||
<<"<schema name=\"test\" version=\"1.5\">
|
||||
<fields>
|
||||
<dynamicField name=\"*_foo_register\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<dynamicField name=\"*\" type=\"ignored\"/>
|
||||
|
||||
<field name=\"_yz_id\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" required=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_ed\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_pn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_fpn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_vtag\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rt\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rk\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rb\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_err\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"text\" type=\"text_general\" indexed=\"true\" stored=\"false\" multiValued=\"true\"/>
|
||||
<field name=\"age\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"host\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"blob\" type=\"binary\" indexed=\"false\" stored=\"true\" multiValued=\"false\"/>
|
||||
</fields>
|
||||
<uniqueKey>_yz_id</uniqueKey>
|
||||
<types>
|
||||
<fieldType name=\"ignored\" indexed=\"false\" stored=\"false\" multiValued=\"false\" class=\"solr.StrField\" />
|
||||
|
||||
<fieldType name=\"_yz_str\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldType name=\"string\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldtype name=\"binary\" class=\"solr.BinaryField\"/>
|
||||
<fieldType name=\"int\" class=\"solr.TrieIntField\" precisionStep=\"0\" positionIncrementGap=\"0\" />
|
||||
<fieldType name=\"text_general\" class=\"solr.TextField\" positionIncrementGap=\"100\">
|
||||
<analyzer type=\"index\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
<analyzer type=\"query\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.SynonymFilterFactory\" synonyms=\"synonyms.txt\" ignoreCase=\"true\" expand=\"true\"/>
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
</fieldType>
|
||||
</types>
|
||||
</schema>">>).
|
||||
-define(TEST_SCHEMA_UPGRADE,
|
||||
<<"<schema name=\"test\" version=\"1.5\">
|
||||
<fields>
|
||||
<dynamicField name=\"*_foo_register\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<dynamicField name=\"*\" type=\"ignored\"/>
|
||||
|
||||
<field name=\"_yz_id\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" required=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_ed\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_pn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_fpn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_vtag\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rt\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rk\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rb\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_err\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"text\" type=\"text_general\" indexed=\"true\" stored=\"false\" multiValued=\"true\"/>
|
||||
<field name=\"age\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"host\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"blob\" type=\"binary\" indexed=\"false\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"method\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
</fields>
|
||||
<uniqueKey>_yz_id</uniqueKey>
|
||||
<types>
|
||||
<fieldType name=\"ignored\" indexed=\"false\" stored=\"false\" multiValued=\"false\" class=\"solr.StrField\" />
|
||||
|
||||
<fieldType name=\"_yz_str\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldType name=\"string\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldtype name=\"binary\" class=\"solr.BinaryField\"/>
|
||||
<fieldType name=\"int\" class=\"solr.TrieIntField\" precisionStep=\"0\" positionIncrementGap=\"0\" />
|
||||
<fieldType name=\"text_general\" class=\"solr.TextField\" positionIncrementGap=\"100\">
|
||||
<analyzer type=\"index\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
<analyzer type=\"query\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.SynonymFilterFactory\" synonyms=\"synonyms.txt\" ignoreCase=\"true\" expand=\"true\"/>
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
</fieldType>
|
||||
</types>
|
||||
</schema>">>).
|
||||
-define(YZ_CAP, {yokozuna, extractor_map_in_cmd}).
|
||||
-define(GET_MAP_RING_MFA, {yz_extractor, get_map, 1}).
|
||||
-define(GET_MAP_MFA, {yz_extractor, get_map, 0}).
|
||||
-define(GET_MAP_READTHROUGH_MFA, {yz_extractor, get_map_read_through, 0}).
|
||||
-define(YZ_META_EXTRACTORS, {yokozuna, extractors}).
|
||||
-define(YZ_EXTRACTOR_MAP, yokozuna_extractor_map).
|
||||
-define(NEW_EXTRACTOR, {"application/httpheader", yz_noop_extractor}).
|
||||
-define(EXTRACTOR_CT, element(1, ?NEW_EXTRACTOR)).
|
||||
-define(EXTRACTOR_MOD, element(2, ?NEW_EXTRACTOR)).
|
||||
-define(DEFAULT_MAP, [{default, yz_noop_extractor},
|
||||
{"application/json",yz_json_extractor},
|
||||
{"application/riak_counter", yz_dt_extractor},
|
||||
{"application/riak_map", yz_dt_extractor},
|
||||
{"application/riak_set", yz_dt_extractor},
|
||||
{"application/xml",yz_xml_extractor},
|
||||
{"text/plain",yz_text_extractor},
|
||||
{"text/xml",yz_xml_extractor}
|
||||
]).
|
||||
-define(EXTRACTMAPEXPECT, lists:sort(?DEFAULT_MAP ++ [?NEW_EXTRACTOR])).
|
||||
-define(SEQMAX, 20).
|
||||
-define(NVAL, 3).
|
||||
-define(CFG,
|
||||
[
|
||||
{riak_kv,
|
||||
[
|
||||
%% allow AAE to build trees and exchange rapidly
|
||||
{anti_entropy_build_limit, {100, 1000}},
|
||||
{anti_entropy_concurrency, 8},
|
||||
{anti_entropy_tick, 1000},
|
||||
%% but start with AAE turned off so as not to interfere with earlier parts of the test
|
||||
{anti_entropy, {off, []}}
|
||||
]},
|
||||
{yokozuna,
|
||||
[
|
||||
{enabled, true}
|
||||
]}
|
||||
]).
|
||||
|
||||
confirm() ->
|
||||
%% This test explicitly requires an upgrade from 2.0.5 to test a
|
||||
%% new capability
|
||||
OldVsn = "2.0.5",
|
||||
|
||||
[_, Node|_] = Cluster = rt:build_cluster(lists:duplicate(4, {OldVsn, ?CFG})),
|
||||
rt:wait_for_cluster_service(Cluster, yokozuna),
|
||||
|
||||
[rt:assert_capability(ANode, ?YZ_CAP, {unknown_capability, ?YZ_CAP}) || ANode <- Cluster],
|
||||
|
||||
OldPid = rt:pbc(Node),
|
||||
|
||||
GenKeys = yokozuna_rt:gen_keys(?SEQMAX),
|
||||
KeyCount = length(GenKeys),
|
||||
|
||||
rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA]),
|
||||
|
||||
yokozuna_rt:write_data(Cluster, OldPid, ?INDEX1,
|
||||
{?SCHEMANAME, ?TEST_SCHEMA}, ?BUCKET1, GenKeys),
|
||||
|
||||
ok = rt:stop_tracing(),
|
||||
|
||||
{ok, BProps} = riakc_pb_socket:get_bucket(OldPid, ?BUCKET1),
|
||||
N = proplists:get_value(n_val, BProps),
|
||||
|
||||
riakc_pb_socket:stop(OldPid),
|
||||
|
||||
PrevGetMapRingCC = rt:get_call_count(Cluster, ?GET_MAP_RING_MFA),
|
||||
PrevGetMapCC = rt:get_call_count(Cluster, ?GET_MAP_MFA),
|
||||
?assertEqual(KeyCount * N, PrevGetMapRingCC),
|
||||
?assertEqual(KeyCount * N, PrevGetMapCC),
|
||||
|
||||
%% test query count
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX1, KeyCount),
|
||||
|
||||
{RingVal1, MDVal1} = get_ring_and_cmd_vals(Node, ?YZ_META_EXTRACTORS,
|
||||
?YZ_EXTRACTOR_MAP),
|
||||
|
||||
?assertEqual(undefined, MDVal1),
|
||||
%% In previous version, Ring only gets map metadata if a non-default
|
||||
%% extractor is registered
|
||||
?assertEqual(undefined, RingVal1),
|
||||
|
||||
?assertEqual(?DEFAULT_MAP, get_map(Node)),
|
||||
|
||||
%% %% Custom Register
|
||||
ExtractMap = register_extractor(Node, ?EXTRACTOR_CT, ?EXTRACTOR_MOD),
|
||||
|
||||
?assertEqual(?EXTRACTMAPEXPECT, ExtractMap),
|
||||
|
||||
%% Upgrade
|
||||
yokozuna_rt:rolling_upgrade(Cluster, current),
|
||||
|
||||
[rt:wait_until_ready(ANode) || ANode <- Cluster],
|
||||
|
||||
[rt:assert_capability(ANode, ?YZ_CAP, true) || ANode <- Cluster],
|
||||
[rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) ||
|
||||
ANode <- Cluster],
|
||||
|
||||
%% test query count again
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX1, KeyCount),
|
||||
|
||||
Pid = rt:pbc(Node),
|
||||
|
||||
rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA,
|
||||
?GET_MAP_READTHROUGH_MFA]),
|
||||
|
||||
yokozuna_rt:write_data(Cluster, Pid, ?INDEX2, {?SCHEMANAME, ?TEST_SCHEMA},
|
||||
?BUCKET2, GenKeys),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX2),
|
||||
|
||||
ok = rt:stop_tracing(),
|
||||
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
CurrGetMapRingCC = rt:get_call_count(Cluster, ?GET_MAP_RING_MFA),
|
||||
CurrGetMapCC = rt:get_call_count(Cluster, ?GET_MAP_MFA),
|
||||
CurrGetMapRTCC = rt:get_call_count(Cluster, ?GET_MAP_READTHROUGH_MFA),
|
||||
|
||||
lager:info("Number of calls to get the map from the ring - current: ~p~n, previous: ~p~n",
|
||||
[CurrGetMapRingCC, PrevGetMapRingCC]),
|
||||
?assert(CurrGetMapRingCC < PrevGetMapRingCC),
|
||||
lager:info("Number of calls to get the map - current: ~p~n, previous: ~p~n",
|
||||
[CurrGetMapCC, PrevGetMapCC]),
|
||||
?assert(CurrGetMapCC =< PrevGetMapCC),
|
||||
lager:info("Number of calls to get_map_read_through/0: ~p~n, Number of calls to get_map/0: ~p~n",
|
||||
[CurrGetMapRTCC, CurrGetMapCC]),
|
||||
?assert(CurrGetMapRTCC =< CurrGetMapCC),
|
||||
|
||||
{_RingVal2, MDVal2} = get_ring_and_cmd_vals(Node, ?YZ_META_EXTRACTORS,
|
||||
?YZ_EXTRACTOR_MAP),
|
||||
|
||||
?assertEqual(?EXTRACTMAPEXPECT, MDVal2),
|
||||
?assertEqual(?EXTRACTMAPEXPECT, get_map(Node)),
|
||||
|
||||
Packet = <<"GET http://www.google.com HTTP/1.1\n">>,
|
||||
test_extractor_works(Cluster, Packet),
|
||||
test_extractor_with_aae_expire(Cluster, ?INDEX2, ?BUCKET2, Packet),
|
||||
test_bad_extraction(Cluster),
|
||||
|
||||
pass.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Private
|
||||
%%%===================================================================
|
||||
|
||||
get_ring_and_cmd_vals(Node, Prefix, Key) ->
|
||||
Ring = rpc:call(Node, yz_misc, get_ring, [transformed]),
|
||||
MDVal = metadata_get(Node, Prefix, Key),
|
||||
RingVal = ring_meta_get(Node, Key, Ring),
|
||||
{RingVal, MDVal}.
|
||||
|
||||
metadata_get(Node, Prefix, Key) ->
|
||||
rpc:call(Node, riak_core_metadata, get, [Prefix, Key, []]).
|
||||
|
||||
ring_meta_get(Node, Key, Ring) ->
|
||||
rpc:call(Node, riak_core_ring, get_meta, [Key, Ring]).
|
||||
|
||||
register_extractor(Node, MimeType, Mod) ->
|
||||
rpc:call(Node, yz_extractor, register, [MimeType, Mod]).
|
||||
|
||||
get_map(Node) ->
|
||||
rpc:call(Node, yz_extractor, get_map, []).
|
||||
|
||||
verify_extractor(Node, PacketData, Mod) ->
|
||||
rpc:call(Node, yz_extractor, run, [PacketData, Mod]).
|
||||
|
||||
bucket_url({Host,Port}, {BType, BName}, Key) ->
|
||||
?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s",
|
||||
[Host, Port, BType, BName, Key]).
|
||||
|
||||
test_extractor_works(Cluster, Packet) ->
|
||||
[rt_intercept:add(ANode, {yz_noop_extractor,
|
||||
[{{extract, 1}, extract_httpheader}]}) ||
|
||||
ANode <- Cluster],
|
||||
[rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster],
|
||||
|
||||
ExpectedExtraction = [{method, 'GET'},
|
||||
{host, <<"www.google.com">>},
|
||||
{uri, <<"/">>}],
|
||||
?assertEqual(ExpectedExtraction,
|
||||
verify_extractor(rt:select_random(Cluster), Packet, ?EXTRACTOR_MOD)).
|
||||
|
||||
test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) ->
|
||||
%% Now make sure we register extractor across all nodes
|
||||
[register_extractor(ANode, ?EXTRACTOR_CT, ?EXTRACTOR_MOD) ||
|
||||
ANode <- Cluster],
|
||||
|
||||
Key = <<"google">>,
|
||||
|
||||
{Host, Port} = rt:select_random(yokozuna_rt:host_entries(
|
||||
rt:connection_info(
|
||||
Cluster))),
|
||||
URL = bucket_url({Host, Port}, Bucket,
|
||||
mochiweb_util:quote_plus(Key)),
|
||||
|
||||
CT = ?EXTRACTOR_CT,
|
||||
{ok, "204", _, _} = yokozuna_rt:http(
|
||||
put, URL, [{"Content-Type", CT}], Packet),
|
||||
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
ANode = rt:select_random(Cluster),
|
||||
yokozuna_rt:search_expect(ANode, Index, <<"host">>,
|
||||
<<"www*">>, 1),
|
||||
|
||||
rpc:multicall(Cluster, riak_kv_entropy_manager, enable, []),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
|
||||
|
||||
yokozuna_rt:search_expect(ANode, Index, <<"host">>,
|
||||
<<"www*">>, 1),
|
||||
|
||||
APid = rt:pbc(rt:select_random(Cluster)),
|
||||
yokozuna_rt:override_schema(APid, Cluster, Index, ?SCHEMANAME,
|
||||
?TEST_SCHEMA_UPGRADE),
|
||||
|
||||
{ok, "200", RHeaders, _} = yokozuna_rt:http(get, URL, [{"Content-Type", CT}],
|
||||
[], []),
|
||||
VC = proplists:get_value("X-Riak-Vclock", RHeaders),
|
||||
|
||||
{ok, "204", _, _} = yokozuna_rt:http(
|
||||
put, URL, [{"Content-Type", CT}, {"X-Riak-Vclock", VC}],
|
||||
Packet),
|
||||
yokozuna_rt:commit(Cluster, Index),
|
||||
|
||||
yokozuna_rt:search_expect(ANode, Index, <<"method">>,
|
||||
<<"GET">>, 1),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()),
|
||||
|
||||
yokozuna_rt:search_expect(ANode, Index, <<"method">>,
|
||||
<<"GET">>, 1),
|
||||
riakc_pb_socket:stop(APid).
|
||||
|
||||
test_bad_extraction(Cluster) ->
|
||||
%% Previous test enabled AAE, which makes the number of repairs here not consistent
|
||||
%% Turn off AAE again just to make the test deterministic.
|
||||
rpc:multicall(Cluster, riak_kv_entropy_manager, disable, []),
|
||||
%%
|
||||
%% register the no-op extractor on all the nodes with a content type
|
||||
%%
|
||||
[register_extractor(ANode, "application/bad-extractor", yz_noop_extractor) ||
|
||||
ANode <- Cluster],
|
||||
%%
|
||||
%% Set up the intercepts so that they extract non-unicode data
|
||||
%%
|
||||
[rt_intercept:add(ANode, {yz_noop_extractor,
|
||||
[{{extract, 1}, extract_non_unicode_data}]}) ||
|
||||
ANode <- Cluster],
|
||||
[rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster],
|
||||
%%
|
||||
%% create and wire up the bucket to the Solr index/core
|
||||
%%
|
||||
yokozuna_rt:create_indexed_bucket_type(Cluster, ?TYPE3, ?INDEX3, ?SCHEMANAME),
|
||||
%%
|
||||
%% Grab the stats before
|
||||
%%
|
||||
{PreviousFailCount, PreviousErrorThresholdCount} = get_error_stats(Cluster),
|
||||
%%
|
||||
%% Put some data into Riak. This should cause the intercepted no-op
|
||||
%% extractor to generate an object to be written into Solr that contains
|
||||
%% non-unicode data.
|
||||
{Host, Port} = rt:select_random(
|
||||
yokozuna_rt:host_entries(rt:connection_info(Cluster))),
|
||||
Key = <<"test_bad_extraction">>,
|
||||
URL = bucket_url({Host, Port}, ?BUCKET3, Key),
|
||||
Headers = [{"Content-Type", "application/bad-extractor"}],
|
||||
Data = <<"blahblahblahblah">>,
|
||||
{ok, "204", _, _} = yokozuna_rt:http(put, URL, Headers, Data),
|
||||
%%
|
||||
%% The put should pass, but because it's "bad data", there should
|
||||
%% be no data in Riak.
|
||||
%%
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX3, 0),
|
||||
%%
|
||||
%% Verify the stats. There should be one more index failure,
|
||||
%% but there should be more more "melts" (error threshold failures)
|
||||
%%
|
||||
yokozuna_rt:wait_until(
|
||||
Cluster,
|
||||
fun(_Node) ->
|
||||
check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount)
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount) ->
|
||||
{FailCount, ErrorThresholdCount} = get_error_stats(Cluster),
|
||||
lager:info(
|
||||
"PreviousFailCount: ~p FailCount: ~p;"
|
||||
" PreviousErrorThresholdCount: ~p; ErrorThresholdCount: ~p",
|
||||
[PreviousFailCount, FailCount,
|
||||
PreviousErrorThresholdCount, ErrorThresholdCount]
|
||||
),
|
||||
PreviousFailCount + ?NVAL == FailCount
|
||||
andalso PreviousErrorThresholdCount == ErrorThresholdCount.
|
||||
|
||||
|
||||
get_error_stats(Cluster) ->
|
||||
AllStats = [rpc:call(Node, yz_stat, get_stats, []) || Node <- Cluster],
|
||||
{
|
||||
lists:sum([get_count([index, bad_entry], count, Stats) || Stats <- AllStats]),
|
||||
lists:sum([get_count([search_index_error_threshold_failure_count], value, Stats) || Stats <- AllStats])
|
||||
}.
|
||||
|
||||
get_count(StatName, Type, Stats) ->
|
||||
proplists:get_value(
|
||||
Type,
|
||||
proplists:get_value(
|
||||
yz_stat:stat_name(StatName),
|
||||
Stats
|
||||
)
|
||||
).
|
@ -1,206 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2014 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%-------------------------------------------------------------------
|
||||
-module(yz_handoff).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("riakc/include/riakc.hrl").
|
||||
|
||||
-define(GET(K,L), proplists:get_value(K, L)).
|
||||
-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
|
||||
-define(INDEX, <<"test_idx">>).
|
||||
-define(BUCKET, <<"test_bkt">>).
|
||||
-define(NUMRUNSTATES, 1).
|
||||
-define(SEQMAX, 1000).
|
||||
-define(TESTCYCLE, 20).
|
||||
-define(N, 3).
|
||||
-define(CFG,
|
||||
[
|
||||
{riak_core,
|
||||
[
|
||||
{ring_creation_size, 16},
|
||||
{n_val, ?N},
|
||||
{handoff_concurrency, 10},
|
||||
{vnode_management_timer, 1000}
|
||||
]},
|
||||
{riak_kv,
|
||||
[
|
||||
%% allow AAE to build trees and exchange rapidly
|
||||
{anti_entropy_build_limit, {100, 1000}},
|
||||
{anti_entropy_concurrency, 8},
|
||||
{handoff_rejected_max, infinity}
|
||||
]},
|
||||
{yokozuna,
|
||||
[
|
||||
{anti_entropy_tick, 1000},
|
||||
{enabled, true}
|
||||
]}
|
||||
]).
|
||||
|
||||
-record(trial_state, {
|
||||
solr_url_before,
|
||||
solr_url_after,
|
||||
leave_node,
|
||||
join_node,
|
||||
admin_node}).
|
||||
|
||||
confirm() ->
|
||||
%% Setup cluster initially
|
||||
[Node1, Node2, _Node3, _Node4, _Node5] = Nodes = rt:build_cluster(5, ?CFG),
|
||||
|
||||
rt:wait_for_cluster_service(Nodes, yokozuna),
|
||||
|
||||
ConnInfo = ?GET(Node2, rt:connection_info([Node2])),
|
||||
{Host, Port} = ?GET(http, ConnInfo),
|
||||
Shards = [{N, node_solr_port(N)} || N <- Nodes],
|
||||
|
||||
%% Generate keys, YZ only supports UTF-8 compatible keys
|
||||
Keys = [<<N:64/integer>> || N <- lists:seq(1, ?SEQMAX),
|
||||
not lists:any(fun(E) -> E > 127 end,
|
||||
binary_to_list(<<N:64/integer>>))],
|
||||
KeyCount = length(Keys),
|
||||
|
||||
Pid = rt:pbc(Node2),
|
||||
yokozuna_rt:write_data(Nodes, Pid, ?INDEX, ?BUCKET, Keys),
|
||||
|
||||
%% Separate out shards for multiple runs
|
||||
[Shard1|Shards2Rest] = Shards,
|
||||
{_, SolrPort1} = Shard1,
|
||||
[{_, SolrPort2}|_] = Shards2Rest,
|
||||
SolrURL = internal_solr_url(Host, SolrPort1, ?INDEX, Shards),
|
||||
BucketURL = bucket_keys_url(Host, Port, ?BUCKET),
|
||||
SearchURL = search_url(Host, Port, ?INDEX),
|
||||
|
||||
lager:info("Verify Replicas Count = (3 * docs/keys) count"),
|
||||
verify_count(SolrURL, (KeyCount * ?N)),
|
||||
|
||||
States = [#trial_state{solr_url_before = SolrURL,
|
||||
solr_url_after = internal_solr_url(Host, SolrPort2, ?INDEX, Shards2Rest),
|
||||
leave_node = Node1},
|
||||
#trial_state{solr_url_before = internal_solr_url(Host, SolrPort2, ?INDEX, Shards2Rest),
|
||||
solr_url_after = SolrURL,
|
||||
join_node = Node1,
|
||||
admin_node = Node2}],
|
||||
|
||||
%% Run set of leave/join trials and count/test #'s from the cluster
|
||||
[[begin
|
||||
check_data(Nodes, KeyCount, BucketURL, SearchURL, State),
|
||||
check_counts(Pid, KeyCount, BucketURL)
|
||||
end || State <- States]
|
||||
|| _ <- lists:seq(1,?NUMRUNSTATES)],
|
||||
|
||||
pass.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Private
|
||||
%%%===================================================================
|
||||
|
||||
node_solr_port(Node) ->
|
||||
{ok, P} = riak_core_util:safe_rpc(Node, application, get_env,
|
||||
[yokozuna, solr_port]),
|
||||
P.
|
||||
|
||||
internal_solr_url(Host, Port, Index) ->
|
||||
?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]).
|
||||
internal_solr_url(Host, Port, Index, Shards) ->
|
||||
Ss = [internal_solr_url(Host, ShardPort, Index)
|
||||
|| {_, ShardPort} <- Shards],
|
||||
?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=*:*&shards=~s",
|
||||
[Host, Port, Index, string:join(Ss, ",")]).
|
||||
|
||||
%% @private
|
||||
bucket_keys_url(Host, Port, BName) ->
|
||||
?FMT("http://~s:~B/buckets/~s/keys?keys=true", [Host, Port, BName]).
|
||||
|
||||
%% @private
|
||||
search_url(Host, Port, Index) ->
|
||||
?FMT("http://~s:~B/solr/~s/select?wt=json&q=*:*", [Host, Port, Index]).
|
||||
|
||||
verify_count(Url, ExpectedCount) ->
|
||||
AreUp =
|
||||
fun() ->
|
||||
{ok, "200", _, DBody} = yokozuna_rt:http(get, Url, [], [], "", 60000),
|
||||
FoundCount = get_count(DBody),
|
||||
lager:info("FoundCount: ~b, ExpectedCount: ~b",
|
||||
[FoundCount, ExpectedCount]),
|
||||
ExpectedCount =:= FoundCount
|
||||
end,
|
||||
?assertEqual(ok, rt:wait_until(AreUp)),
|
||||
ok.
|
||||
|
||||
get_count(Resp) ->
|
||||
Struct = mochijson2:decode(Resp),
|
||||
kvc:path([<<"response">>, <<"numFound">>], Struct).
|
||||
|
||||
get_keys_count(BucketURL) ->
|
||||
{ok, "200", _, RBody} = yokozuna_rt:http(get, BucketURL, [], []),
|
||||
Struct = mochijson2:decode(RBody),
|
||||
length(kvc:path([<<"keys">>], Struct)).
|
||||
|
||||
check_counts(Pid, InitKeyCount, BucketURL) ->
|
||||
PBCounts = [begin {ok, Resp} = riakc_pb_socket:search(
|
||||
Pid, ?INDEX, <<"*:*">>),
|
||||
Resp#search_results.num_found
|
||||
end || _ <- lists:seq(1,?TESTCYCLE)],
|
||||
HTTPCounts = [begin {ok, "200", _, RBody} = yokozuna_rt:http(
|
||||
get, BucketURL, [], []),
|
||||
Struct = mochijson2:decode(RBody),
|
||||
length(kvc:path([<<"keys">>], Struct))
|
||||
end || _ <- lists:seq(1,?TESTCYCLE)],
|
||||
MinPBCount = lists:min(PBCounts),
|
||||
MinHTTPCount = lists:min(HTTPCounts),
|
||||
lager:info("Before-Node-Leave PB: ~b, After-Node-Leave PB: ~b",
|
||||
[InitKeyCount, MinPBCount]),
|
||||
?assertEqual(InitKeyCount, MinPBCount),
|
||||
lager:info("Before-Node-Leave PB: ~b, After-Node-Leave HTTP: ~b",
|
||||
[InitKeyCount, MinHTTPCount]),
|
||||
?assertEqual(InitKeyCount, MinHTTPCount).
|
||||
|
||||
check_data(Cluster, KeyCount, BucketURL, SearchURL, S) ->
|
||||
CheckCount = KeyCount * ?N,
|
||||
KeysBefore = get_keys_count(BucketURL),
|
||||
|
||||
UpdatedCluster = leave_or_join(Cluster, S),
|
||||
|
||||
yokozuna_rt:wait_for_aae(UpdatedCluster),
|
||||
|
||||
KeysAfter = get_keys_count(BucketURL),
|
||||
lager:info("KeysBefore: ~b, KeysAfter: ~b", [KeysBefore, KeysAfter]),
|
||||
?assertEqual(KeysBefore, KeysAfter),
|
||||
|
||||
lager:info("Verify Search Docs Count =:= key count"),
|
||||
lager:info("Run Search URL: ~s", [SearchURL]),
|
||||
verify_count(SearchURL, KeysAfter),
|
||||
lager:info("Verify Replicas Count = (3 * docs/keys) count"),
|
||||
lager:info("Run Search URL: ~s", [S#trial_state.solr_url_after]),
|
||||
verify_count(S#trial_state.solr_url_after, CheckCount).
|
||||
|
||||
leave_or_join(Cluster, S=#trial_state{join_node=undefined}) ->
|
||||
Node = S#trial_state.leave_node,
|
||||
rt:leave(Node),
|
||||
?assertEqual(ok, rt:wait_until_unpingable(Node)),
|
||||
Cluster -- [Node];
|
||||
leave_or_join(Cluster, S=#trial_state{leave_node=undefined}) ->
|
||||
Node = S#trial_state.join_node,
|
||||
NodeAdmin = S#trial_state.admin_node,
|
||||
ok = rt:start_and_wait(Node),
|
||||
ok = rt:join(Node, NodeAdmin),
|
||||
?assertEqual(ok, rt:wait_until_nodes_ready(Cluster)),
|
||||
?assertEqual(ok, rt:wait_until_no_pending_changes(Cluster)),
|
||||
Cluster ++ [Node].
|
@ -1,305 +0,0 @@
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2015 Basho Technologies, Inc.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%%--------------------------------------------------------------------
|
||||
-module(yz_schema_change_reset).
|
||||
-compile(export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("riakc/include/riakc.hrl").
|
||||
|
||||
-define(GET(K,L), proplists:get_value(K, L)).
|
||||
-define(INDEX, <<"test_schema_change_reset">>).
|
||||
-define(TYPE, <<"test_schema_change">>).
|
||||
-define(BUCKET1, <<"test_schema_change_reset">>).
|
||||
-define(BUCKET2, {?TYPE, <<"test_schema_change_reset_2">>}).
|
||||
-define(SCHEMANAME, <<"test">>).
|
||||
|
||||
-define(TEST_SCHEMA,
|
||||
<<"<schema name=\"test\" version=\"1.5\">
|
||||
<fields>
|
||||
<dynamicField name=\"*_foo_register\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<dynamicField name=\"*\" type=\"ignored\"/>
|
||||
|
||||
<field name=\"_yz_id\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" required=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_ed\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_pn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_fpn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_vtag\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rt\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rk\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rb\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_err\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"text\" type=\"text_general\" indexed=\"true\" stored=\"false\" multiValued=\"true\"/>
|
||||
<field name=\"age\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
</fields>
|
||||
<uniqueKey>_yz_id</uniqueKey>
|
||||
<types>
|
||||
<fieldType name=\"ignored\" indexed=\"false\" stored=\"false\" multiValued=\"false\" class=\"solr.StrField\" />
|
||||
|
||||
<fieldType name=\"_yz_str\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldType name=\"string\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldType name=\"int\" class=\"solr.TrieIntField\" precisionStep=\"0\" positionIncrementGap=\"0\" />
|
||||
<fieldType name=\"text_general\" class=\"solr.TextField\" positionIncrementGap=\"100\">
|
||||
<analyzer type=\"index\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
<analyzer type=\"query\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.SynonymFilterFactory\" synonyms=\"synonyms.txt\" ignoreCase=\"true\" expand=\"true\"/>
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
</fieldType>
|
||||
</types>
|
||||
</schema>">>).
|
||||
-define(TEST_SCHEMA_UPDATE,
|
||||
<<"<schema name=\"test\" version=\"1.5\">
|
||||
<fields>
|
||||
<dynamicField name=\"*_foo_register\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<dynamicField name=\"*_baz_counter\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<dynamicField name=\"paths.*\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<dynamicField name=\"*\" type=\"ignored\"/>
|
||||
|
||||
<field name=\"_yz_id\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" required=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_ed\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_pn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_fpn\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_vtag\" type=\"_yz_str\" indexed=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rt\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rk\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_rb\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"_yz_err\" type=\"_yz_str\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"text\" type=\"text_general\" indexed=\"true\" stored=\"false\" multiValued=\"true\"/>
|
||||
<field name=\"age\" type=\"string\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
<field name=\"hello_i\" type=\"int\" indexed=\"true\" stored=\"true\" multiValued=\"false\"/>
|
||||
</fields>
|
||||
<uniqueKey>_yz_id</uniqueKey>
|
||||
<types>
|
||||
<fieldType name=\"ignored\" indexed=\"false\" stored=\"false\" multiValued=\"false\" class=\"solr.StrField\" />
|
||||
<fieldType name=\"_yz_str\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldType name=\"string\" class=\"solr.StrField\" sortMissingLast=\"true\" />
|
||||
<fieldType name=\"int\" class=\"solr.TrieIntField\" precisionStep=\"0\" positionIncrementGap=\"0\" />
|
||||
<fieldType name=\"text_general\" class=\"solr.TextField\" positionIncrementGap=\"100\">
|
||||
<analyzer type=\"index\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
<analyzer type=\"query\">
|
||||
<tokenizer class=\"solr.StandardTokenizerFactory\"/>
|
||||
<filter class=\"solr.StopFilterFactory\" ignoreCase=\"true\" words=\"stopwords.txt\" enablePositionIncrements=\"true\" />
|
||||
<filter class=\"solr.SynonymFilterFactory\" synonyms=\"synonyms.txt\" ignoreCase=\"true\" expand=\"true\"/>
|
||||
<filter class=\"solr.LowerCaseFilterFactory\"/>
|
||||
</analyzer>
|
||||
</fieldType>
|
||||
</types>
|
||||
</schema>">>).
|
||||
|
||||
-define(SEQMAX, 20).
|
||||
-define(CFG,
|
||||
[{riak_core,
|
||||
[
|
||||
{ring_creation_size, 16}
|
||||
]},
|
||||
{riak_kv, [
|
||||
{anti_entropy_concurrency, 8},
|
||||
{anti_entropy_build_limit, {100, 1000}}
|
||||
]},
|
||||
{yokozuna,
|
||||
[
|
||||
{anti_entropy_tick, 1000},
|
||||
{enabled, true}
|
||||
]}
|
||||
]).
|
||||
|
||||
confirm() ->
|
||||
[Node1|_RestNodes] = Cluster = rt:build_cluster(4, ?CFG),
|
||||
rt:wait_for_cluster_service(Cluster, yokozuna),
|
||||
|
||||
GenKeys = yokozuna_rt:gen_keys(?SEQMAX),
|
||||
KeyCount = length(GenKeys),
|
||||
lager:info("KeyCount ~p", [KeyCount]),
|
||||
|
||||
Pid = rt:pbc(rt:select_random(Cluster)),
|
||||
|
||||
lager:info("Write initial data to index ~p with schema ~p",
|
||||
[?INDEX, ?SCHEMANAME]),
|
||||
|
||||
yokozuna_rt:write_data(Cluster, Pid, ?INDEX,
|
||||
{?SCHEMANAME, ?TEST_SCHEMA},
|
||||
?BUCKET1, GenKeys),
|
||||
lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s",
|
||||
[?TYPE, ?INDEX]),
|
||||
rt:create_and_activate_bucket_type(Node1, ?TYPE, [{datatype, map},
|
||||
{search_index, ?INDEX}]),
|
||||
|
||||
lager:info("Write and check age at integer per original schema"),
|
||||
|
||||
NewObj1A = riakc_obj:new(?BUCKET1, <<"keyA">>,
|
||||
<<"{\"age\":26}">>,
|
||||
"application/json"),
|
||||
|
||||
NewObj1B = riakc_obj:new(?BUCKET1, <<"keyB">>,
|
||||
<<"{\"age\":99}">>,
|
||||
"application/json"),
|
||||
|
||||
{ok, _ObjA} = riakc_pb_socket:put(Pid, NewObj1A, [return_head]),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
{ok, _ObjB} = riakc_pb_socket:put(Pid, NewObj1B, [return_head]),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 2),
|
||||
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"age:26">>, {<<"age">>, <<"26">>}, []),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"age:99">>, {<<"age">>, <<"99">>}, []),
|
||||
|
||||
Map1 = riakc_map:update(
|
||||
{<<"0_foo">>, register},
|
||||
fun(R) ->
|
||||
riakc_register:set(<<"44ab">>, R)
|
||||
end, riakc_map:new()),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
?BUCKET2,
|
||||
<<"keyMap1">>,
|
||||
riakc_map:to_op(Map1)),
|
||||
|
||||
{ok, Map2} = riakc_pb_socket:fetch_type(Pid, ?BUCKET2, <<"keyMap1">>),
|
||||
Map3 = riakc_map:update(
|
||||
{<<"1_baz">>, counter},
|
||||
fun(R) ->
|
||||
riakc_counter:increment(10, R)
|
||||
end, Map2),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
?BUCKET2,
|
||||
<<"keyMap1">>,
|
||||
riakc_map:to_op(Map3)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"0_foo_register:44ab">>,
|
||||
{<<"0_foo_register">>, <<"44ab">>},
|
||||
[]),
|
||||
|
||||
lager:info("Expire and re-check count before updating schema"),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_aae(Cluster),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 3),
|
||||
|
||||
yokozuna_rt:override_schema(Pid, Cluster, ?INDEX, ?SCHEMANAME, ?TEST_SCHEMA_UPDATE),
|
||||
|
||||
lager:info("Write and check hello_i at integer per schema update"),
|
||||
|
||||
NewObj2 = riakc_obj:new(?BUCKET1, <<"key2">>,
|
||||
<<"{\"hello_i\":36}">>,
|
||||
"application/json"),
|
||||
|
||||
{ok, _Obj2} = riakc_pb_socket:put(Pid, NewObj2, [return_head]),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 4),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"hello_i:36">>, {<<"hello_i">>, <<"36">>}, []),
|
||||
|
||||
lager:info("Write and check age at string per schema update"),
|
||||
|
||||
NewObj3 = riakc_obj:new(?BUCKET1, <<"key3">>,
|
||||
<<"{\"age\":\"3jlkjkl\"}">>,
|
||||
"application/json"),
|
||||
|
||||
{ok, _Obj3} = riakc_pb_socket:put(Pid, NewObj3, [return_head]),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 5),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"age:3jlkjkl">>, {<<"age">>, <<"3jlkjkl">>},
|
||||
[]),
|
||||
|
||||
lager:info("Expire and re-check count to make sure we're correctly indexed
|
||||
by the new schema"),
|
||||
|
||||
yokozuna_rt:expire_trees(Cluster),
|
||||
yokozuna_rt:wait_for_aae(Cluster),
|
||||
|
||||
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 5),
|
||||
|
||||
ANode = rt:select_random(Cluster),
|
||||
yokozuna_rt:search_expect(ANode, ?INDEX, <<"age">>, <<"*">>, 3),
|
||||
|
||||
lager:info("Re-Put because AAE won't find a diff even though the types
|
||||
have changed, as it only compares based on bkey currently.
|
||||
Also, this re-put will work as we have a default bucket (type)
|
||||
with allow_mult=false... no siblings"),
|
||||
|
||||
{ok, _Obj4} = riakc_pb_socket:put(Pid, NewObj1A, [return_head]),
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"age:26">>, {<<"age">>, <<"26">>}, []),
|
||||
|
||||
lager:info("Re-Put Map data by dec/inc counter to account for *change* and
|
||||
allow previously unindexed counter to be searchable"),
|
||||
|
||||
{ok, Map4} = riakc_pb_socket:fetch_type(Pid, ?BUCKET2, <<"keyMap1">>),
|
||||
Map5 = riakc_map:update(
|
||||
{<<"1_baz">>, counter},
|
||||
fun(R) ->
|
||||
riakc_counter:decrement(0, R),
|
||||
riakc_counter:increment(0, R)
|
||||
end, Map4),
|
||||
ok = riakc_pb_socket:update_type(
|
||||
Pid,
|
||||
?BUCKET2,
|
||||
<<"keyMap1">>,
|
||||
riakc_map:to_op(Map5)),
|
||||
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"0_foo_register:44ab">>,
|
||||
{<<"0_foo_register">>, <<"44ab">>},
|
||||
[]),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"1_baz_counter:10">>,
|
||||
{<<"1_baz_counter">>, <<"10">>},
|
||||
[]),
|
||||
|
||||
lager:info("Test nested json searches w/ unsearched fields ignored"),
|
||||
|
||||
NewObj5 = riakc_obj:new(?BUCKET1, <<"key4">>,
|
||||
<<"{\"quip\":\"blashj3\",
|
||||
\"paths\":{\"quip\":\"88\"}}">>,
|
||||
"application/json"),
|
||||
{ok, _Obj5} = riakc_pb_socket:put(Pid, NewObj5, [return_head]),
|
||||
|
||||
yokozuna_rt:commit(Cluster, ?INDEX),
|
||||
yokozuna_rt:assert_search(Pid, Cluster, ?INDEX,
|
||||
<<"paths.quip:88">>,
|
||||
{<<"paths.quip">>, <<"88">>},
|
||||
[]),
|
||||
|
||||
riakc_pb_socket:stop(Pid),
|
||||
|
||||
pass.
|
||||
|
Loading…
Reference in New Issue
Block a user