From 5f2ef6f576248b805642556a3d86871612858661 Mon Sep 17 00:00:00 2001 From: Brett Hazen Date: Tue, 21 Feb 2017 21:01:13 -0700 Subject: [PATCH] Move YZ tests to yokozuna repo --- intercepts/yz_noop_extractor_intercepts.erl | 44 -- intercepts/yz_solrq_helper_intercepts.erl | 48 -- src/yokozuna_rt.erl | 563 ------------------- tests/yz_core_properties_create_unload.erl | 170 ------ tests/yz_crdt.erl | 564 -------------------- tests/yz_default_bucket_type_upgrade.erl | 98 ---- tests/yz_ensemble.erl | 117 ---- tests/yz_extractors.erl | 441 --------------- tests/yz_handoff.erl | 206 ------- tests/yz_schema_change_reset.erl | 305 ----------- 10 files changed, 2556 deletions(-) delete mode 100644 intercepts/yz_noop_extractor_intercepts.erl delete mode 100644 intercepts/yz_solrq_helper_intercepts.erl delete mode 100644 src/yokozuna_rt.erl delete mode 100644 tests/yz_core_properties_create_unload.erl delete mode 100644 tests/yz_crdt.erl delete mode 100644 tests/yz_default_bucket_type_upgrade.erl delete mode 100644 tests/yz_ensemble.erl delete mode 100644 tests/yz_extractors.erl delete mode 100644 tests/yz_handoff.erl delete mode 100644 tests/yz_schema_change_reset.erl diff --git a/intercepts/yz_noop_extractor_intercepts.erl b/intercepts/yz_noop_extractor_intercepts.erl deleted file mode 100644 index ac714628..00000000 --- a/intercepts/yz_noop_extractor_intercepts.erl +++ /dev/null @@ -1,44 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2015 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%------------------------------------------------------------------- - -%% Example from: -%% http://docs.basho.com/riak/latest/dev/search/custom-extractors/#An-Example-Custom-Extractor - --module(yz_noop_extractor_intercepts). --compile(export_all). --include("intercept.hrl"). - -extract_httpheader(Value) -> - extract_httpheader(Value, []). - -extract_httpheader(Value, _Opts) -> - {ok, - {http_request, - Method, - {absoluteURI, http, Host, undefined, Uri}, - _Version}, - _Rest} = erlang:decode_packet(http, Value, []), - [{method, Method}, {host, list_to_binary(Host)}, {uri, list_to_binary(Uri)}]. - -extract_non_unicode_data(Value) -> - extract_non_unicode_data(Value, []). - -extract_non_unicode_data(_Value, _Opts) -> - [{blob, <<9147374713>>}]. diff --git a/intercepts/yz_solrq_helper_intercepts.erl b/intercepts/yz_solrq_helper_intercepts.erl deleted file mode 100644 index 926929d0..00000000 --- a/intercepts/yz_solrq_helper_intercepts.erl +++ /dev/null @@ -1,48 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2015 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%------------------------------------------------------------------- --module(yz_solrq_helper_intercepts). --compile(export_all). - --include("intercept.hrl"). - --define(M, yz_solrq_helper_orig). - -handle_get_ops_for_no_sibling_deletes(LI, P, Obj) -> - Lookup = ets:lookup(intercepts_tab, del_put), - case Lookup of - [] -> original_get_ops_for_no_sibling_deletes(LI, P, Obj); - _ -> - case proplists:get_value(del_put, Lookup) of - 0 -> - error_logger:info_msg( - "Delete operation intercepted for BKey ~p", - [{riak_object:bucket(Obj), riak_object:key(Obj)}]), - ets:update_counter(intercepts_tab, del_put, 1), - []; - _ -> - original_get_ops_for_no_sibling_deletes(LI, P, Obj) - end - end. - -original_get_ops_for_no_sibling_deletes(LI, P, Obj) -> - error_logger:info_msg( - "Delete operation original for BKey ~p", - [{riak_object:bucket(Obj), riak_object:key(Obj)}]), - ?M:get_ops_for_no_sibling_deletes_orig(LI, P, Obj). diff --git a/src/yokozuna_rt.erl b/src/yokozuna_rt.erl deleted file mode 100644 index 21a49231..00000000 --- a/src/yokozuna_rt.erl +++ /dev/null @@ -1,563 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2015 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%------------------------------------------------------------------- --module(yokozuna_rt). - --include_lib("eunit/include/eunit.hrl"). --include("yokozuna_rt.hrl"). - --export([brutal_kill_remove_index_dirs/3, - check_exists/2, - clear_trees/1, - commit/2, - drain_solrqs/1, - expire_trees/1, - gen_keys/1, - host_entries/1, - create_indexed_bucket_type/3, - create_indexed_bucket_type/4, - override_schema/5, - remove_index_dirs/3, - rolling_upgrade/2, - search/4, - search/5, - search_expect/5, - search_expect/6, - search_expect/7, - assert_search/6, - verify_num_found_query/3, - wait_for_aae/1, - wait_for_full_exchange_round/2, - wait_for_index/2, - wait_for_schema/2, - wait_for_schema/3, - wait_until/2, - write_data/5, - write_data/6, - http/4, - http/5, - http/6]). - --type host() :: string(). --type portnum() :: integer(). --type count() :: non_neg_integer(). --type json_string() :: atom | string() | binary(). --type search_type() :: solr | yokozuna. --type method() :: get | post | head | options | put | delete | trace | - mkcol | propfind | proppatch | lock | unlock | move | copy. --type response() :: {ok, string(), [{string(), string()}], string()|binary()} | - {error, term()}. --type cluster() :: [node()]. - - --define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). --define(SOFTCOMMIT, 1000). - --spec host_entries(rt:conn_info()) -> [{host(), portnum()}]. -host_entries(ClusterConnInfo) -> - [riak_http(I) || {_,I} <- ClusterConnInfo]. - -%% @doc Generate `SeqMax' keys. Yokozuna supports only UTF-8 compatible keys. --spec gen_keys(pos_integer()) -> list(). -gen_keys(SeqMax) -> - [<> || N <- lists:seq(1, SeqMax), - not lists:any( - fun(E) -> E > 127 end, - binary_to_list(<>))]. - -%% @doc Write `Keys' via the PB inteface to a `Bucket' and have them -%% searchable in an `Index'. --spec write_data([node()], pid(), index_name(), bucket(), [binary()]) -> ok. -write_data(Cluster, Pid, Index, Bucket, Keys) -> - riakc_pb_socket:set_options(Pid, [queue_if_disconnected]), - - create_and_set_index(Cluster, Pid, Bucket, Index), - yokozuna_rt:commit(Cluster, Index), - - %% Write keys - lager:info("Writing ~p keys", [length(Keys)]), - [ok = rt:pbc_write(Pid, Bucket, Key, Key, "text/plain") || Key <- Keys], - ok. - --spec write_data([node()], pid(), index_name(), {schema_name(), raw_schema()}, - bucket(), [binary()]) -> ok. -write_data(Cluster, Pid, Index, {SchemaName, SchemaData}, - Bucket, Keys) -> - riakc_pb_socket:set_options(Pid, [queue_if_disconnected]), - - riakc_pb_socket:create_search_schema(Pid, SchemaName, SchemaData), - - create_and_set_index(Cluster, Pid, Bucket, Index, SchemaName), - yokozuna_rt:commit(Cluster, Index), - - %% Write keys - lager:info("Writing ~p keys", [length(Keys)]), - [ok = rt:pbc_write(Pid, Bucket, Key, Key, "text/plain") || Key <- Keys], - ok. - -%% @doc Peform a rolling upgrade of the `Cluster' to a different `Version' based -%% on current | previous | legacy. --spec rolling_upgrade([node()], current | previous | legacy) -> ok. -rolling_upgrade(Cluster, Version) -> - rolling_upgrade(Cluster, Version, same, [riak_kv, yokozuna]). --spec rolling_upgrade([node()], current | previous | legacy, [term()] | same, [atom()]) -> ok. -rolling_upgrade(Cluster, Version, UpgradeConfig, WaitForServices) when is_list(Cluster) -> - lager:info("Perform rolling upgrade on cluster ~p", [Cluster]), - [rolling_upgrade(Node, Version, UpgradeConfig, WaitForServices) || Node <- Cluster], - ok; -rolling_upgrade(Node, Version, UpgradeConfig, WaitForServices) -> - rt:upgrade(Node, Version, UpgradeConfig), - [rt:wait_for_service(Node, Service) || Service <- WaitForServices], - ok. - -%% @doc Use AAE status to verify that exchange has occurred for all -%% partitions since the time this function was invoked. --spec wait_for_aae([node()]) -> ok. -wait_for_aae(Cluster) -> - lager:info("Wait for AAE to migrate/repair indexes"), - wait_for_all_trees(Cluster), - wait_for_full_exchange_round(Cluster, erlang:now()), - ok. - -%% @doc Wait for all AAE trees to be built. --spec wait_for_all_trees([node()]) -> ok. -wait_for_all_trees(Cluster) -> - F = fun(Node) -> - lager:info("Check if all trees built for node ~p", [Node]), - Info = rpc:call(Node, yz_kv, compute_tree_info, []), - NotBuilt = [X || {_,undefined}=X <- Info], - NotBuilt == [] - end, - wait_until(Cluster, F), - ok. - -%% @doc Wait for a full exchange round since `Timestamp'. This means -%% that all `{Idx,N}' for all partitions must have exchanged after -%% `Timestamp'. --spec wait_for_full_exchange_round([node()], os:now()) -> ok. -wait_for_full_exchange_round(Cluster, Timestamp) -> - lager:info("wait for full AAE exchange round on cluster ~p", [Cluster]), - MoreRecent = - fun({_Idx, _, undefined, _RepairStats}) -> - false; - ({_Idx, _, AllExchangedTime, _RepairStats}) -> - AllExchangedTime > Timestamp - end, - AllExchanged = - fun(Node) -> - Exchanges = rpc:call(Node, yz_kv, compute_exchange_info, []), - {_Recent, WaitingFor1} = lists:partition(MoreRecent, Exchanges), - WaitingFor2 = [element(1,X) || X <- WaitingFor1], - lager:info("Still waiting for AAE of ~p ~p", [Node, WaitingFor2]), - [] == WaitingFor2 - end, - wait_until(Cluster, AllExchanged), - ok. - -%% @doc Wait for index creation. This is to handle *legacy* versions of yokozuna -%% in upgrade tests --spec wait_for_index(list(), index_name()) -> ok. -wait_for_index(Cluster, Index) -> - IsIndexUp = - fun(Node) -> - lager:info("Waiting for index ~s to be avaiable on node ~p", - [Index, Node]), - rpc:call(Node, yz_index, exists, [Index]) - end, - wait_until(Cluster, IsIndexUp), - ok. - -%% @see wait_for_schema/3 -wait_for_schema(Cluster, Name) -> - wait_for_schema(Cluster, Name, ignore). - -%% @doc Wait for the schema `Name' to be read by all nodes in -%% `Cluster' before returning. If `Content' is binary data when -%% verify the schema bytes exactly match `Content'. --spec wait_for_schema([node()], schema_name(), ignore | raw_schema()) -> ok. -wait_for_schema(Cluster, Name, Content) -> - F = fun(Node) -> - lager:info("Attempt to read schema ~s from node ~p", - [Name, Node]), - {Host, Port} = riak_pb(hd(rt:connection_info([Node]))), - {ok, PBConn} = riakc_pb_socket:start_link(Host, Port), - R = riakc_pb_socket:get_search_schema(PBConn, Name), - riakc_pb_socket:stop(PBConn), - case R of - {ok, PL} -> - case Content of - ignore -> - Name == proplists:get_value(name, PL); - _ -> - (Name == proplists:get_value(name, PL)) and - (Content == proplists:get_value(content, PL)) - end; - _ -> - false - end - end, - wait_until(Cluster, F), - ok. - -%% @doc Expire YZ trees --spec expire_trees([node()]) -> ok. -expire_trees(Cluster) -> - lager:info("Expire all trees"), - _ = [ok = rpc:call(Node, yz_entropy_mgr, expire_trees, []) - || Node <- Cluster], - - %% The expire is async so just give it a moment - timer:sleep(100), - ok. - -%% @doc Expire YZ trees --spec clear_trees([node()]) -> ok. -clear_trees(Cluster) -> - lager:info("Expire all trees"), - _ = [ok = rpc:call(Node, yz_entropy_mgr, clear_trees, []) - || Node <- Cluster], - ok. - -brutal_kill_remove_index_dirs(Nodes, IndexName, Services) -> - IndexDirs = get_index_dirs(IndexName, Nodes), - rt:brutal_kill(hd(Nodes)), - [rt:stop(ANode) || ANode <- tl(Nodes)], - remove_index_dirs2(Nodes, IndexDirs, Services), - ok. - -%% @doc Remove index directories, removing the index. --spec remove_index_dirs([node()], index_name(), [atom()]) -> ok. -remove_index_dirs(Nodes, IndexName, Services) -> - IndexDirs = get_index_dirs(IndexName, Nodes), - [rt:stop(ANode) || ANode <- Nodes], - remove_index_dirs2(Nodes, IndexDirs, Services), - ok. - -remove_index_dirs2(Nodes, IndexDirs, Services) -> - lager:info("Remove index dirs: ~p, on nodes: ~p~n", - [IndexDirs, Nodes]), - [rt:del_dir(binary_to_list(IndexDir)) || IndexDir <- IndexDirs], - [start_and_wait(ANode, Services) || ANode <- Nodes]. - -get_index_dirs(IndexName, Nodes) -> - IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) || - Node <- Nodes], - IndexDirs. - -start_and_wait(Node, WaitForServices) -> - rt:start(Node), - [rt:wait_for_service(Node, Service) || Service <- WaitForServices]. - -%% @doc Check if index/core exists in metadata, disk via yz_index:exists. --spec check_exists([node()], index_name()) -> ok. -check_exists(Nodes, IndexName) -> - wait_until(Nodes, - fun(N) -> - rpc:call(N, yz_index, exists, [IndexName]) - end). - --spec verify_num_found_query([node()], index_name(), count()) -> ok. -verify_num_found_query(Cluster, Index, ExpectedCount) -> - F = fun(Node) -> - Pid = rt:pbc(Node), - {ok, {_, _, _, NumFound}} = riakc_pb_socket:search(Pid, Index, <<"*:*">>), - lager:info("Check Count, Expected: ~p | Actual: ~p~n", - [ExpectedCount, NumFound]), - ExpectedCount =:= NumFound - end, - wait_until(Cluster, F), - ok. - -%% @doc Brought over from yz_rt in the yokozuna repo - FORNOW. --spec search_expect(node()|[node()], index_name(), string(), string(), - non_neg_integer()) -> ok. -search_expect(NodeOrNodes, Index, Name, Term, Expect) -> - search_expect(NodeOrNodes, yokozuna, Index, Name, Term, Expect). - --spec search_expect(node()|[node()], search_type(), index_name(), - string(), string(), [string()], non_neg_integer()) -> ok. -search_expect(Nodes, solr, Index, Name0, Term0, Shards, Expect) - when is_list(Shards), length(Shards) > 0, is_list(Nodes) -> - Name = quote_unicode(Name0), - Term = quote_unicode(Term0), - Node = rt:select_random(Nodes), - {Host, Port} = solr_hp(Node, Nodes), - URL = internal_solr_url(Host, Port, Index, Name, Term, Shards), - lager:info("Run solr search ~s", [URL]), - Opts = [{response_format, binary}], - F = fun(_) -> - {ok, "200", _, R} = ibrowse:send_req(URL, [], get, [], Opts, - ?IBROWSE_TIMEOUT), - verify_count_http(Expect, R) - end, - wait_until(Nodes, F); -search_expect(Node, solr=Type, Index, Name, Term, Shards, Expect) - when is_list(Shards), length(Shards) > 0 -> - search_expect([Node], Type, Index, Name, Term, Shards, Expect). - --spec search_expect(node()|[node()], search_type(), index_name(), - string(), string(), non_neg_integer()) -> ok. -search_expect(Nodes, solr=Type, Index, Name, Term, Expect) when is_list(Nodes) -> - Node = rt:select_random(Nodes), - HP = solr_hp(Node, Nodes), - - %% F could actually be returned in a shared fun, but w/ so much arity, - %% just using it twice makes sense. - F = fun(_) -> - {ok, "200", _, R} = search(Type, HP, Index, Name, Term), - verify_count_http(Expect, R) - end, - - wait_until(Nodes, F); -search_expect(Nodes, yokozuna=Type, Index, Name, Term, Expect) - when is_list(Nodes) -> - HP = hd(host_entries(rt:connection_info(Nodes))), - - F = fun(_) -> - {ok, "200", _, R} = search(Type, HP, Index, Name, Term), - verify_count_http(Expect, R) - end, - - wait_until(Nodes, F); -search_expect(Node, Type, Index, Name, Term, Expect) -> - search_expect([Node], Type, Index, Name, Term, Expect). - -assert_search(Pid, Cluster, Index, Search, SearchExpect, Params) -> - F = fun(_) -> - lager:info("Searching ~p and asserting it exists", - [SearchExpect]), - case riakc_pb_socket:search(Pid, Index, Search, Params) of - {ok,{search_results,[{_Index,Fields}], _Score, Found}} -> - ?assert(lists:member(SearchExpect, Fields)), - case Found of - 1 -> true; - 0 -> false - end; - {ok, {search_results, [], _Score, 0}} -> - lager:info("Search has not yet yielded data"), - false - end - end, - wait_until(Cluster, F). - -search(HP, Index, Name, Term) -> - search(yokozuna, HP, Index, Name, Term). - -search(Type, {Host, Port}, Index, Name, Term) when is_integer(Port) -> - search(Type, {Host, integer_to_list(Port)}, Index, Name, Term); - -search(Type, {Host, Port}, Index, Name0, Term0) -> - Name = quote_unicode(Name0), - Term = quote_unicode(Term0), - FmtStr = case Type of - solr -> - "http://~s:~s/internal_solr/~s/select?q=~s:~s&wt=json"; - yokozuna -> - "http://~s:~s/search/query/~s?q=~s:~s&wt=json" - end, - URL = ?FMT(FmtStr, [Host, Port, Index, Name, Term]), - lager:info("Run search ~s", [URL]), - Opts = [{response_format, binary}], - ibrowse:send_req(URL, [], get, [], Opts, ?IBROWSE_TIMEOUT). - -%%%=================================================================== -%%% Private -%%%=================================================================== - --spec verify_count_http(count(), json_string()) -> boolean(). -verify_count_http(Expected, Resp) -> - Count = get_count_http(Resp), - lager:info("Expected: ~p, Actual: ~p", [Expected, Count]), - Expected =:= Count. - --spec get_count_http(json_string()) -> count(). -get_count_http(Resp) -> - Struct = mochijson2:decode(Resp), - kvc:path([<<"response">>, <<"numFound">>], Struct). - --spec riak_http({node(), rt:interfaces()} | rt:interfaces()) -> - {host(), portnum()}. -riak_http({_Node, ConnInfo}) -> - riak_http(ConnInfo); -riak_http(ConnInfo) -> - proplists:get_value(http, ConnInfo). - --spec riak_pb({node(), rt:interfaces()} | rt:interfaces()) -> - {host(), portnum()}. -riak_pb({_Node, ConnInfo}) -> - riak_pb(ConnInfo); -riak_pb(ConnInfo) -> - proplists:get_value(pb, ConnInfo). - --spec create_and_set_index([node()], pid(), bucket(), index_name()) -> ok. -create_and_set_index(Cluster, Pid, Bucket, Index) -> - %% Create a search index and associate with a bucket - lager:info("Create a search index ~s and associate it with bucket ~s", - [Index, Bucket]), - _ = riakc_pb_socket:create_search_index(Pid, Index), - %% For possible legacy upgrade reasons or general check around the cluster, - %% wrap create index in a wait - wait_for_index(Cluster, Index), - set_index(Pid, hd(Cluster), Bucket, Index). --spec create_and_set_index([node()], pid(), bucket(), index_name(), - schema_name()) -> ok. -create_and_set_index(Cluster, Pid, Bucket, Index, Schema) -> - %% Create a search index and associate with a bucket - lager:info("Create a search index ~s with a custom schema named ~s and " ++ - "associate it with bucket ~p", [Index, Schema, Bucket]), - _ = riakc_pb_socket:create_search_index(Pid, Index, Schema, []), - %% For possible legacy upgrade reasons or general check around the cluster, - %% wrap create index in a wait - wait_for_index(Cluster, Index), - set_index(Pid, hd(Cluster), Bucket, Index). - --spec set_index(pid(), node(), bucket(), index_name()) -> ok. -set_index(_Pid, Node, {BucketType, _Bucket}, Index) -> - lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s", - [BucketType, Index]), - rt:create_and_activate_bucket_type(Node, BucketType, [{search_index, Index}]); -set_index(Pid, _Node, Bucket, Index) -> - ok = riakc_pb_socket:set_search_index(Pid, Bucket, Index). - -internal_solr_url(Host, Port, Index) -> - ?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]). -internal_solr_url(Host, Port, Index, Name, Term, Shards) -> - Ss = [internal_solr_url(Host, ShardPort, Index) - || {_, ShardPort} <- Shards], - ?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=~s:~s&shards=~s", - [Host, Port, Index, Name, Term, string:join(Ss, ",")]). - -quote_unicode(Value) -> - mochiweb_util:quote_plus(binary_to_list( - unicode:characters_to_binary(Value))). - --spec commit([node()], index_name()) -> ok. -commit(Nodes, Index) -> - %% Wait for yokozuna index to trigger, then force a commit - timer:sleep(?SOFTCOMMIT), - lager:info("Commit search writes to ~s at softcommit (default) ~p", - [Index, ?SOFTCOMMIT]), - rpc:multicall(Nodes, yz_solr, commit, [Index]), - ok. - --spec drain_solrqs(node() | cluster()) -> ok. -drain_solrqs(Cluster) when is_list(Cluster) -> - [drain_solrqs(Node) || Node <- Cluster]; -drain_solrqs(Node) -> - rpc:call(Node, yz_solrq_drain_mgr, drain, []), - ok. - --spec override_schema(pid(), [node()], index_name(), schema_name(), string()) -> - {ok, [node()]}. -override_schema(Pid, Cluster, Index, Schema, RawUpdate) -> - lager:info("Overwrite schema with updated schema"), - ok = riakc_pb_socket:create_search_schema(Pid, Schema, RawUpdate), - yokozuna_rt:wait_for_schema(Cluster, Schema, RawUpdate), - [Node|_] = Cluster, - {ok, _} = rpc:call(Node, yz_index, reload, [Index]). - -%% @doc Wrapper around `rt:wait_until' to verify `F' against multiple -%% nodes. The function `F' is passed one of the `Nodes' as -%% argument and must return a `boolean()' delcaring whether the -%% success condition has been met or not. --spec wait_until([node()], fun((node()) -> boolean())) -> ok. -wait_until(Nodes, F) -> - [?assertEqual(ok, rt:wait_until(Node, F)) || Node <- Nodes], - ok. - --spec solr_hp(node(), [node()]) -> {host(), portnum()}. -solr_hp(Node, Cluster) -> - CI = connection_info(Cluster), - solr_http(proplists:get_value(Node, CI)). - --spec connection_info(list()) -> orddict:orddict(). -connection_info(Cluster) -> - CI = orddict:from_list(rt:connection_info(Cluster)), - SolrInfo = orddict:from_list([{Node, [{solr_http, get_yz_conn_info(Node)}]} - || Node <- Cluster]), - orddict:merge(fun(_,V1,V2) -> V1 ++ V2 end, CI, SolrInfo). - --spec solr_http({node(), orddict:orddict()}) -> {host(), portnum()}. -solr_http({_Node, ConnInfo}) -> - solr_http(ConnInfo); -solr_http(ConnInfo) -> - proplists:get_value(solr_http, ConnInfo). - --spec get_yz_conn_info(node()) -> {string(), string()}. -get_yz_conn_info(Node) -> - {ok, SolrPort} = rpc:call(Node, application, get_env, [yokozuna, solr_port]), - %% Currently Yokozuna hardcodes listener to all interfaces - {"127.0.0.1", SolrPort}. - --spec http(method(), string(), list(), binary()|[]) -> response(). -http(Method, URL, Headers, Body) -> - Opts = [], - ibrowse:send_req(URL, Headers, Method, Body, Opts, ?IBROWSE_TIMEOUT). - --spec http(method(), string(), list(), binary()|[], list()|timeout()) - -> response(). -http(Method, URL, Headers, Body, Opts) when is_list(Opts) -> - ibrowse:send_req(URL, Headers, Method, Body, Opts, ?IBROWSE_TIMEOUT); -http(Method, URL, Headers, Body, Timeout) when is_integer(Timeout) -> - Opts = [], - ibrowse:send_req(URL, Headers, Method, Body, Opts, Timeout). - --spec http(method(), string(), list(), binary()|[], list(), timeout()) - -> response(). -http(Method, URL, Headers, Body, Opts, Timeout) when - is_list(Opts) andalso is_integer(Timeout) -> - ibrowse:send_req(URL, Headers, Method, Body, Opts, Timeout). - --spec create_indexed_bucket_type(cluster(), binary(), index_name()) -> ok. -create_indexed_bucket_type(Cluster, BucketType, IndexName) -> - ok = create_index(Cluster, IndexName), - ok = create_bucket_type(Cluster, BucketType, [{search_index, IndexName}]). - --spec create_indexed_bucket_type(cluster(), binary(), index_name(), - schema_name()) -> ok. -create_indexed_bucket_type(Cluster, BucketType, IndexName, SchemaName) -> - ok = create_index(Cluster, IndexName, SchemaName), - ok = create_bucket_type(Cluster, BucketType, [{search_index, IndexName}]). - --spec create_index(cluster(), index_name()) -> ok. -create_index(Cluster, Index) -> - Node = select_random(Cluster), - lager:info("Creating index ~s [~p]", [Index, Node]), - rpc:call(Node, yz_index, create, [Index]), - ok = wait_for_index(Cluster, Index). - --spec create_index(cluster(), index_name(), schema_name()) -> ok. -create_index(Cluster, Index, SchemaName) -> - Node = select_random(Cluster), - lager:info("Creating index ~s with schema ~s [~p]", - [Index, SchemaName, Node]), - rpc:call(Node, yz_index, create, [Index, SchemaName]), - ok = wait_for_index(Cluster, Index). - -select_random(List) -> - Length = length(List), - Idx = random:uniform(Length), - lists:nth(Idx, List). - --spec create_bucket_type(cluster(), binary(), [term()]) -> ok. -create_bucket_type(Cluster, BucketType, Props) -> - Node = select_random(Cluster), - rt:create_and_activate_bucket_type(Node, BucketType, Props), - rt:wait_until_bucket_type_status(BucketType, active, Node), - rt:wait_until_bucket_type_visible(Cluster, BucketType). diff --git a/tests/yz_core_properties_create_unload.erl b/tests/yz_core_properties_create_unload.erl deleted file mode 100644 index 186227e6..00000000 --- a/tests/yz_core_properties_create_unload.erl +++ /dev/null @@ -1,170 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2014 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%------------------------------------------------------------------- --module(yz_core_properties_create_unload). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). - --define(CFG, [{riak_kv, - [ - %% allow AAE to build trees and exchange rapidly - {anti_entropy_build_limit, {1000, 1000}}, - {anti_entropy_concurrency, 64}, - {anti_entropy_tick, 1000} - ]}, - {yokozuna, - [ - {enabled, true}, - {anti_entropy_tick, 1000} - ]}]). --define(INDEX, <<"test_idx_core">>). --define(TYPE, <<"data">>). --define(BUCKET, {?TYPE, <<"test_bkt_core">>}). --define(SEQMAX, 100). - -confirm() -> - Cluster = rt:build_cluster(4, ?CFG), - rt:wait_for_cluster_service(Cluster, yokozuna), - - %% Generate keys, YZ only supports UTF-8 compatible keys - Keys = [<> || N <- lists:seq(1, ?SEQMAX), - not lists:any(fun(E) -> E > 127 end, - binary_to_list(<>))], - KeyCount = length(Keys), - - %% Randomly select a subset of the test nodes to remove - %% core.properties from - RandNodes = rt:random_sublist(Cluster, 3), - - %% Select one of the modified nodes as a client endpoint - Node = rt:select_random(RandNodes), - Pid = rt:pbc(Node), - riakc_pb_socket:set_options(Pid, [queue_if_disconnected]), - - %% Create a search index and associate with a bucket - lager:info("Create and set Index ~p for Bucket ~p~n", [?INDEX, ?BUCKET]), - _ = riakc_pb_socket:create_search_index(Pid, ?INDEX), - yokozuna_rt:wait_for_index(Cluster, ?INDEX), - - ok = rt:create_and_activate_bucket_type(Node, - ?TYPE, - [{search_index, ?INDEX}]), - - rt:wait_until_bucket_type_visible(Cluster, ?TYPE), - - %% Write keys and wait for soft commit - lager:info("Writing ~p keys", [KeyCount]), - [ok = rt:pbc_write(Pid, ?BUCKET, Key, Key, "text/plain") || Key <- Keys], - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount), - - - test_core_props_removal(Cluster, RandNodes, KeyCount, Pid), - test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid), - test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid), - test_brutal_kill_and_delete_index_dirs(Cluster, RandNodes, KeyCount, Pid), - - riakc_pb_socket:stop(Pid), - - pass. - -test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) -> - lager:info("Remove core.properties file in each index data dir"), - remove_core_props(RandNodes, ?INDEX), - - yokozuna_rt:check_exists(Cluster, ?INDEX), - - lager:info("Write one more piece of data"), - ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount+1). - -test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid) -> - lager:info("Remove index directories on each node and let them recreate/reindex"), - yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX, [riak_kv, yokozuna]), - - yokozuna_rt:check_exists(Cluster, ?INDEX), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_aae(Cluster), - - lager:info("Write second piece of data"), - ok = rt:pbc_write(Pid, ?BUCKET, <<"food">>, <<"foody">>, "text/plain"), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount+2). - -test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid) -> - lager:info("Remove segment info files in each index data dir"), - remove_segment_infos(RandNodes, ?INDEX), - - lager:info("To fix, we remove index directories on each node and let them recreate/reindex"), - - yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX, [riak_kv, yokozuna]), - - yokozuna_rt:check_exists(Cluster, ?INDEX), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_aae(Cluster), - - lager:info("Write third piece of data"), - ok = rt:pbc_write(Pid, ?BUCKET, <<"baz">>, <<"bar">>, "text/plain"), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount+3). - -test_brutal_kill_and_delete_index_dirs(Cluster, RandNodes, KeyCount, Pid) -> - lager:info("Remove index directories on each node and let them recreate/reindex"), - yokozuna_rt:brutal_kill_remove_index_dirs(RandNodes, ?INDEX, [riak_kv, yokozuna]), - - yokozuna_rt:check_exists(Cluster, ?INDEX), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_aae(Cluster), - - lager:info("Write fourth piece of data"), - ok = rt:pbc_write(Pid, ?BUCKET, <<"food">>, <<"foody">>, "text/plain"), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 4). - -%% @doc Remove core properties file on nodes. -remove_core_props(Nodes, IndexName) -> - IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) || - Node <- Nodes], - PropsFiles = [filename:join([IndexDir, "core.properties"]) || - IndexDir <- IndexDirs], - lager:info("Remove core.properties files: ~p, on nodes: ~p~n", - [PropsFiles, Nodes]), - [file:delete(PropsFile) || PropsFile <- PropsFiles], - ok. - -%% @doc Remove lucence segment info files to check if reindexing will occur -%% on re-creation/re-indexing. -remove_segment_infos(Nodes, IndexName) -> - IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) || - Node <- Nodes], - SiPaths = [binary_to_list(filename:join([IndexDir, "data/index/*.si"])) || - IndexDir <- IndexDirs], - SiFiles = lists:append([filelib:wildcard(Path) || Path <- SiPaths]), - lager:info("Remove segment info files: ~p, on in dirs: ~p~n", - [SiFiles, IndexDirs]), - [file:delete(SiFile) || SiFile <- SiFiles]. diff --git a/tests/yz_crdt.erl b/tests/yz_crdt.erl deleted file mode 100644 index f2c114cb..00000000 --- a/tests/yz_crdt.erl +++ /dev/null @@ -1,564 +0,0 @@ --module(yz_crdt). - --compile(export_all). --compile({parse_transform, rt_intercept_pt}). - --include_lib("eunit/include/eunit.hrl"). - --define(HARNESS, (rt_config:get(rt_harness))). --define(INDEX, <<"maps">>). --define(TYPE, <<"maps">>). --define(KEY, <<"Chris Meiklejohn">>). --define(BUCKET, {?TYPE, <<"testbucket">>}). --define(GET(K,L), proplists:get_value(K, L)). --define(N, 3). - --define(CONF, - [ - {riak_core, - [{ring_creation_size, 8}] - }, - {riak_kv, - [{delete_mode, keep}, - {anti_entropy_build_limit, {100, 1000}}, - {anti_entropy_concurrency, 8}, - {anti_entropy_tick, 1000}]}, - {yokozuna, - [{enabled, true}] - }]). - -confirm() -> - rt:set_advanced_conf(all, ?CONF), - - %% Configure cluster. - Nodes = rt:build_cluster(5, ?CONF), - - Node = rt:select_random(Nodes), - - %% Create PB connection. - Pid = rt:pbc(Node), - riakc_pb_socket:set_options(Pid, [queue_if_disconnected]), - - %% Create index. - riakc_pb_socket:create_search_index(Pid, ?INDEX, <<"_yz_default">>, []), - - %% Create bucket type for maps. - rt:create_and_activate_bucket_type(Node, - ?TYPE, - [{datatype, map}, - {n_val, ?N}, - {search_index, ?INDEX}]), - - lager:info("Write some sample data"), - test_sample_data(Pid, Nodes, ?BUCKET, ?KEY, ?INDEX), - lager:info("Search and Validate our CRDT writes/updates."), - ok = rt:wait_until(fun() -> validate_sample_data(Pid, ?KEY, ?INDEX) - end), - - lager:info("Test setting the register of a map twice to different values." - " (The # of results should still be 1)"), - test_repeat_sets(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY), - ok = rt:wait_until(fun() -> validate_test_repeat_set(Pid, ?INDEX) - end), - - lager:info("FYI: delete_mode is on keep here to make sure YZ handles" - " deletes correctly throughout."), - lager:info("Test varying deletes operations"), - test_and_validate_delete(Pid, Nodes, ?BUCKET, ?INDEX, ?KEY), - - lager:info("Test to make sure yz AAE handles deletes/removes correctly"), - test_and_validate_delete_aae(Pid, Nodes, ?BUCKET, ?INDEX), - - lager:info("Test to make sure siblings don't exist after partition"), - test_siblings(Nodes, ?BUCKET, ?INDEX), - lager:info("Verify counts and operations after heal + transfers + commits"), - ok = rt:wait_until(fun() -> validate_test_siblings(Pid, ?BUCKET, ?INDEX) - end), - - %% Stop PB connection - riakc_pb_socket:stop(Pid), - - pass. - -test_sample_data(Pid, Cluster, Bucket, Key, Index) -> - Map1 = riakc_map:update( - {<<"name">>, register}, - fun(R) -> - riakc_register:set(Key, R) - end, riakc_map:new()), - Map2 = riakc_map:update( - {<<"interests">>, set}, - fun(S) -> - riakc_set:add_element(<<"thing">>, S) end, - Map1), - - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key, - riakc_map:to_op(Map2)), - - drain_and_commit(Cluster, Index). - -%% @doc Test setting the register of a map twice to different values. -%% The # of results should still be 1. -test_repeat_sets(Pid, Cluster, Bucket, Index, Key) -> - {ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key), - M2 = riakc_map:update( - {<<"update">>, register}, - fun(R) -> - riakc_register:set(<<"foo">>, R) - end, M1), - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key, - riakc_map:to_op(M2)), - M3 = riakc_map:update( - {<<"update">>, register}, - fun(R) -> - riakc_register:set(<<"bar">>, R) - end, M1), - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key, - riakc_map:to_op(M3)), - - drain_and_commit(Cluster, Index). - -%% @doc Tests varying deletes of within a CRDT map and checks for correct counts -%% - Remove registers, remove and add elements within a set -%% - Delete the map (associated w/ a key) -%% - Recreate objects in the map and delete the map again -test_and_validate_delete(Pid, Cluster, Bucket, Index, Key) -> - {ok, M1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key), - - lager:info("Remove register from map"), - M2 = riakc_map:erase({<<"name">>, register}, M1), - - lager:info("Delete element from set (in map) & Add element to set"), - M3 = riakc_map:update( - {<<"interests">>, set}, - fun(S) -> - riakc_set:del_element(<<"thing">>, - riakc_set:add_element(<<"roses">>, S)) - end, M2), - - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key, - riakc_map:to_op(M3)), - - drain_and_commit(Cluster, Index), - - lager:info("Search deleted/erased name_register:*"), - search_and_validate_found(Pid, Index, <<"name_register:*">>, 0), - - lager:info("Add another element to set (in map)"), - M4 = riakc_map:update( - {<<"interests">>, set}, - fun(S) -> - riakc_set:add_element(<<"pans">>, S) - end, M3), - - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key, - riakc_map:to_op(M4)), - - drain_and_commit(Cluster, Index), - - lager:info("Search deleted interests_set:thing*"), - search_and_validate_found(Pid, Index, <<"interests_set:thing*">>, 0), - - lager:info("Delete key for map"), - ?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)), - - drain_and_commit(Cluster, Index), - - ?assertEqual({error, {notfound, map}}, - riakc_pb_socket:fetch_type(Pid, Bucket, Key)), - - lager:info("Search deleted map: *:*"), - search_and_validate_found(Pid, Index, <<"*:*">>, 0), - - lager:info("Recreate object and check counts..."), - - lager:info("Set a new register for map"), - M5 = riakc_map:update( - {<<"name">>, register}, - fun(R) -> - riakc_register:set(<<"hello">>, R) - end, riakc_map:new()), - - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key, - riakc_map:to_op(M5)), - - drain_and_commit(Cluster, Index), - - {ok, M6} = riakc_pb_socket:fetch_type(Pid, Bucket, Key), - Keys = riakc_map:fetch_keys(M6), - ?assertEqual(1, length(Keys)), - ?assert(riakc_map:is_key({<<"name">>, register}, M6)), - - lager:info("Search recreated map: *:*"), - search_and_validate_found(Pid, Index, <<"*:*">>, 1), - - lager:info("Delete key for map again"), - ?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key)), - - drain_and_commit(Cluster, Index), - - ?assertEqual({error, {notfound, map}}, - riakc_pb_socket:fetch_type(Pid, Bucket, Key)), - - lager:info("Search ~p deleted map: *:*", [Key]), - search_and_validate_found(Pid, Index, <<"*:*">>, 0). - -%% @doc Tests key/map delete and AAE -%% - Use intercept to trap yz_kv:delete_operation to skip over -%% - Makes sure that yz AAE handles tombstone on expire/exchange -%% - Recreate objects and check -test_and_validate_delete_aae(Pid, Cluster, Bucket, Index) -> - Key1 = <<"ohyokozuna">>, - M1 = riakc_map:update( - {<<"name">>, register}, - fun(R) -> - riakc_register:set(<<"jokes are">>, R) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key1, - riakc_map:to_op(M1)), - - Key2 = <<"ohriaksearch">>, - M2 = riakc_map:update( - {<<"name">>, register}, - fun(R) -> - riakc_register:set(<<"better explained">>, R) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key2, - riakc_map:to_op(M2)), - - drain_and_commit(Cluster, Index), - - lager:info("Add and load handle_delete_operation intercept"), - - [make_intercepts_tab(ANode) || ANode <- Cluster], - - [rt_intercept:add(ANode, {yz_solrq_helper, [{{get_ops_for_no_sibling_deletes, 3}, - handle_get_ops_for_no_sibling_deletes}]}) - || ANode <- Cluster], - [true = rpc:call(ANode, ets, insert, [intercepts_tab, {del_put, 0}]) || - ANode <- Cluster], - [rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster], - - lager:info("Delete key ~p for map", [Key2]), - ?assertEqual(ok, riakc_pb_socket:delete(Pid, Bucket, Key2)), - ?assertEqual({error, {notfound, map}}, - riakc_pb_socket:fetch_type(Pid, Bucket, Key2)), - - drain_and_commit(Cluster, Index), - - lager:info("Search all results, expect extra b/c tombstone" - " and we've modified the delete op : *:*"), - search_and_validate_found(Pid, Index, <<"*:*">>, 2), - - lager:info("Expire and re-check"), - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()), - drain_and_commit(Cluster, Index), - - lager:info("Search all results, expect removed tombstone b/c AAE" - " should clean it up: *:*"), - search_and_validate_found(Pid, Index, <<"*:*">>, 1), - - lager:info("Recreate object and check counts"), - - M3 = riakc_map:update( - {<<"name">>, register}, - fun(R) -> - riakc_register:set(<<"hello again, is it me you're" - "looking for">>, R) - end, riakc_map:new()), - - ok = riakc_pb_socket:update_type( - Pid, - Bucket, - Key2, - riakc_map:to_op(M3)), - - drain_and_commit(Cluster, Index), - - {ok, M4} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2), - Keys = riakc_map:fetch_keys(M4), - ?assertEqual(1, length(Keys)), - ?assert(riakc_map:is_key({<<"name">>, register}, M4)), - - lager:info("Search recreated map: *:*"), - search_and_validate_found(Pid, Index, <<"*:*">>, 2). - -%% @doc Tests sibling handling/merge when there's a partition -%% - Write/remove from separate partitions -%% - Verify counts and that CRDTs have no siblings, vtags, -%% after healing partitions. The CRDT map merges so the search -%% results be consistent. -test_siblings(Cluster, Bucket, Index) -> - Key1 = <<"Movies">>, - Key2 = <<"Games">>, - Set1 = <<"directors">>, - Set2 = <<"characters">>, - - %% make siblings - {P1, P2} = lists:split(1, Cluster), - - %% Create an object in Partition 1 and siblings in Partition 2 - lager:info("Create partition: ~p | ~p", [P1, P2]), - Partition = rt:partition(P1, P2), - - %% PB connections for accessing each side - Pid1 = rt:pbc(hd(P1)), - Pid2 = rt:pbc(hd(P2)), - - riakc_pb_socket:set_options(Pid1, [queue_if_disconnected, auto_reconnect]), - riakc_pb_socket:set_options(Pid2, [queue_if_disconnected, auto_reconnect]), - - %% P1 writes - lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p", - [Key1, <<"Kubrick">>]), - M1 = riakc_map:update( - {Set1, set}, - fun(S) -> - riakc_set:add_element(<<"Kubrick">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid1, - Bucket, - Key1, - riakc_map:to_op(M1)), - - lager:info("Writing to Partition 1 Set 1: Key ~p | Director ~p", - [Key1, <<"Demme">>]), - M2 = riakc_map:update( - {Set1, set}, - fun(S) -> - riakc_set:add_element(<<"Demme">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid1, - Bucket, - Key1, - riakc_map:to_op(M2)), - - %% P2 Siblings - lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p", - [Key2, <<"Sonic">>]), - M3 = riakc_map:update( - {Set2, set}, - fun(S) -> - riakc_set:add_element(<<"Sonic">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key2, - riakc_map:to_op(M3)), - - lager:info("Delete key from Partition 2: Key ~p", [Key2]), - ok = riakc_pb_socket:delete(Pid2, Bucket, Key2), - - lager:info("Writing to Partition 2 Set 2: after delete: Key ~p | Char" - " ~p", [Key2, <<"Crash">>]), - M4 = riakc_map:update( - {Set2, set}, - fun(S) -> - riakc_set:add_element(<<"Crash">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key2, - riakc_map:to_op(M4)), - - lager:info("Writing to Partition 2 Set 2: Key ~p | Char ~p", - [Key2, <<"Mario">>]), - M5 = riakc_map:update( - {Set2, set}, - fun(S) -> - riakc_set:add_element(<<"Mario">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key2, - riakc_map:to_op(M5)), - - lager:info("Writing to Partition 2 Set 1: Key ~p | Director ~p", - [Key1, <<"Klimov">>]), - M6 = riakc_map:update( - {Set1, set}, - fun(S) -> - riakc_set:add_element(<<"Klimov">>, S) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid2, - Bucket, - Key1, - riakc_map:to_op(M6)), - - rt:heal(Partition), - rt:wait_until_transfers_complete(Cluster), - - drain_and_commit(Cluster, Index). - -validate_sample_data(Pid, Key, Index) -> - try - Thing = <<"thing">>, - - {ok, {search_results, Results1a, _, Found1}} = riakc_pb_socket:search( - Pid, Index, <<"name_register:Chris*">>), - ?assertEqual(1, Found1), - - ?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results1a)), - Key), - ?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results1a)), - Thing), - - {ok, {search_results, Results2a, _, Found2}} = riakc_pb_socket:search( - Pid, Index, <<"interests_set:thing*">>), - ?assertEqual(1, Found2), - ?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results2a)), - Key), - ?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results2a)), - Thing), - - {ok, {search_results, Results3a, _, Found3}} = riakc_pb_socket:search( - Pid, Index, <<"_yz_rb:testbucket">>), - ?assertEqual(1, Found3), - ?assertEqual(?GET(<<"name_register">>, ?GET(Index, Results3a)), - Key), - ?assertEqual(?GET(<<"interests_set">>, ?GET(Index, Results3a)), - Thing), - - %% Redo queries and check if results are equal - {ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search( - Pid, Index, <<"name_register:Chris*">>), - ?assertEqual(number_of_fields(Results1a, Index), - number_of_fields(Results1b, Index)), - - {ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search( - Pid, Index, <<"interests_set:thing*">>), - ?assertEqual(number_of_fields(Results2a, Index), - number_of_fields(Results2b, Index)), - - {ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search( - Pid, Index, <<"_yz_rb:testbucket">>), - ?assertEqual(number_of_fields(Results3a, Index), - number_of_fields(Results3b, Index)), - - true - catch Err:Reason -> - lager:info("Waiting for CRDT search results to converge. Error" - " was ~p.", [{Err, Reason}]), - false - end. - -validate_test_repeat_set(Pid, Index) -> - try - {ok, {search_results, _R, _, Found}} = riakc_pb_socket:search( - Pid, Index, - <<"update_register:*">>), - ?assertEqual(1, Found), - - true - catch Err:Reason -> - lager:info("Waiting for CRDT search results to converge. Error" - " was ~p.", [{Err, Reason}]), - false - end. - -validate_test_siblings(Pid, Bucket, Index) -> - try - Key1 = <<"Movies">>, - Key2 = <<"Games">>, - - {ok, MF1} = riakc_pb_socket:fetch_type(Pid, Bucket, Key1), - Keys = riakc_map:fetch_keys(MF1), - ?assertEqual(1, length(Keys)), - ?assert(riakc_map:is_key({<<"directors">>, set}, MF1)), - {ok, {search_results, Results1, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"directors_set:*">>), - lager:info("Search movies map directors_set:*: ~p~n", [Results1]), - ?assertEqual(3, length(proplists:lookup_all(<<"directors_set">>, - ?GET(?INDEX, Results1)))), - - {ok, MF2} = riakc_pb_socket:fetch_type(Pid, Bucket, Key2), - Keys2 = riakc_map:fetch_keys(MF2), - ?assertEqual(1, length(Keys2)), - ?assert(riakc_map:is_key({<<"characters">>, set}, MF2)), - {ok, {search_results, Results2, _, _}} = riakc_pb_socket:search( - Pid, Index, - <<"characters_set:*">>), - lager:info("Search games map characters_set:*: ~p~n", [Results2]), - ?assertEqual(2, length(proplists:lookup_all(<<"characters_set">>, - ?GET(?INDEX, Results2)))), - - {ok, {search_results, Results3, _, Found}} = riakc_pb_socket:search( - Pid, Index, - <<"_yz_vtag:*">>), - lager:info("Search vtags in search *:*: ~p~n", [Results3]), - ?assertEqual(0, Found), - true - catch Err:Reason -> - lager:info("Waiting for CRDT search results to converge. Error" - " was ~p.", [{Err, Reason}]), - false - end. - -%% @private -drain_and_commit(Cluster, Index) -> - yokozuna_rt:drain_solrqs(Cluster), - yokozuna_rt:commit(Cluster, Index). - -%% @private -number_of_fields(Resp, Index) -> - length(?GET(Index, Resp)). - -%% @private -make_intercepts_tab(Node) -> - SupPid = rpc:call(Node, erlang, whereis, [sasl_safe_sup]), - intercepts_tab = rpc:call(Node, ets, new, [intercepts_tab, [named_table, - public, set, {heir, SupPid, {}}]]). - -%% @private -search_and_validate_found(Pid, Index, Search, ExpectedCount) -> - ok = rt:wait_until( - fun() -> - try - {ok, {search_results, Results2, _, F}} = - riakc_pb_socket:search(Pid, Index, Search), - ?assertEqual(ExpectedCount, F), - true - catch Err:Reason -> - lager:info( - "Waiting for CRDT search results to converge." - " Index: ~p" - " Search: ~p" - " Error: ~p", - [Index, Search, {Err, Reason}] - ), - false - end - end). - diff --git a/tests/yz_default_bucket_type_upgrade.erl b/tests/yz_default_bucket_type_upgrade.erl deleted file mode 100644 index 0152d692..00000000 --- a/tests/yz_default_bucket_type_upgrade.erl +++ /dev/null @@ -1,98 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2015 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%-------------------------------------------------------------------- - -%% @doc Test that checks to make sure that default bucket_types -%% do not lose data when expiring/clearing AAE trees when -%% trees are rebuilt for comparison. -%% @end - - --module(yz_default_bucket_type_upgrade). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). --include_lib("riakc/include/riakc.hrl"). - --define(N, 3). --define(YZ_CAP, {yokozuna, handle_legacy_default_bucket_type_aae}). --define(INDEX, <<"test_upgrade_idx">>). --define(BUCKET, <<"test_upgrade_bucket">>). --define(SEQMAX, 2000). --define(CFG, - [{riak_core, - [ - {ring_creation_size, 16}, - {default_bucket_props, - [ - {n_val, ?N}, - {allow_mult, true}, - {dvv_enabled, true} - ]} - ]}, - {riak_kv, - [ - {anti_entropy_build_limit, {100, 1000}}, - {anti_entropy_concurrency, 8} - ] - }, - {yokozuna, - [ - {anti_entropy_tick, 1000}, - {enabled, true} - ]} - ]). - -confirm() -> - %% This test explicitly requires an upgrade from 2.0.5 to test a - %% new capability - OldVsn = "2.0.5", - - [_, Node|_] = Cluster = rt:build_cluster(lists:duplicate(4, {OldVsn, ?CFG})), - rt:wait_for_cluster_service(Cluster, yokozuna), - - [rt:assert_capability(ANode, ?YZ_CAP, {unknown_capability, ?YZ_CAP}) || ANode <- Cluster], - - GenKeys = yokozuna_rt:gen_keys(?SEQMAX), - KeyCount = length(GenKeys), - lager:info("KeyCount ~p", [KeyCount]), - - OldPid = rt:pbc(Node), - - yokozuna_rt:write_data(Cluster, OldPid, ?INDEX, ?BUCKET, GenKeys), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount), - - %% Upgrade - yokozuna_rt:rolling_upgrade(Cluster, current), - - [rt:assert_capability(ANode, ?YZ_CAP, v1) || ANode <- Cluster], - [rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [v1, v0]) || ANode <- Cluster], - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount), - - lager:info("Write one more piece of data"), - Pid = rt:pbc(Node), - ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 1), - - pass. diff --git a/tests/yz_ensemble.erl b/tests/yz_ensemble.erl deleted file mode 100644 index f4a60459..00000000 --- a/tests/yz_ensemble.erl +++ /dev/null @@ -1,117 +0,0 @@ --module(yz_ensemble). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). - --define(CFG, - [ - {riak_core, - [ - {ring_creation_size, 8} - ]}, - {yokozuna, - [ - {enabled, true} - ]} - ]). - -confirm() -> - NumNodes = 3, - NVal = 3, - ConfigB = ensemble_util:fast_config(NVal), - Config = ConfigB ++ [{yokozuna, [{enabled, true}]}], - lager:info("Building cluster and waiting for ensemble to stablize"), - Nodes = build_cluster_with_yz_support(NumNodes, Config, NVal), - rt:wait_for_cluster_service(Nodes, yokozuna), - vnode_util:load(Nodes), - Node = hd(Nodes), - - lager:info("Creating/activating 'strong' bucket type"), - rt:create_and_activate_bucket_type(Node, <<"strong">>, - [{consistent, true}, {n_val, NVal}]), - - Bucket = {<<"strong">>, <<"test">>}, - Index = <<"testi">>, - create_index(Node, Index), - set_bucket_props(Node, Bucket, Index), - - verify_ensemble_delete_support(Nodes, Bucket, Index), - - pass. - - -%% @private -%% @doc Populates then deletes from SC bucket -verify_ensemble_delete_support(Cluster, Bucket, Index) -> - %% Yz only supports UTF-8 compatible keys - Keys = [<> || N <- lists:seq(1,2000), - not lists:any(fun(E) -> E > 127 end,binary_to_list(<>))], - - PBC = rt:pbc(hd(Cluster)), - - lager:info("Writing ~p keys", [length(Keys)]), - [ok = rt:pbc_write(PBC, Bucket, Key, Key, "text/plain") || Key <- Keys], - yokozuna_rt:commit(Cluster, Index), - - %% soft commit wait, then check that last key is indexed - lager:info("Search for keys to verify they exist"), - LKey = lists:last(Keys), - rt:wait_until(fun() -> - {M, _} = riakc_pb_socket:search(PBC, Index, query_value(LKey)), - ok == M - end), - [{ok, _} = - riakc_pb_socket:search(PBC, Index, query_value(Key)) || Key <- Keys], - - lager:info("Deleting keys"), - [riakc_pb_socket:delete(PBC, Bucket, Key) || Key <- Keys], - yokozuna_rt:commit(Cluster, Index), - rt:wait_until(fun() -> - case riakc_pb_socket:search(PBC, Index, query_value(LKey)) of - {ok,{search_results,Res,_,_}} -> - lager:info("RES: ~p ~p~n", [Res, LKey]), - Res == []; - S -> - lager:info("OTHER: ~p ~p~n", [S, LKey]), - false - end - end), - [ {ok,{search_results,[],_,_}} = - riakc_pb_socket:search(PBC, Index, query_value(Key)) || Key <- Keys], - - ok. - - -%% @private -%% @doc build a cluster from ensemble_util + yz support -%% -%% NOTE: There's a timing issue that causes join_cluster to hang the r_t -%% node when adding yokozuna and ensemble support. Waiting for yokozuna -%% to load on each node allows join_cluster to complete consistently -build_cluster_with_yz_support(Num, Config, NVal) -> - Nodes = rt:deploy_nodes(Num, Config), - [rt:wait_for_cluster_service([N], yokozuna) || N <- Nodes], - Node = hd(Nodes), - rt:join_cluster(Nodes), - ensemble_util:wait_until_cluster(Nodes), - ensemble_util:wait_for_membership(Node), - ensemble_util:wait_until_stable(Node, NVal), - Nodes. - -%% @private -%% @doc Builds a simple riak key query -query_value(Value) -> - V2 = iolist_to_binary(re:replace(Value, "\"", "%22")), - V3 = iolist_to_binary(re:replace(V2, "\\\\", "%5C")), - <<"_yz_rk:\"",V3/binary,"\"">>. - -%% pulled from yz_rt - -%% @private -create_index(Node, Index) -> - lager:info("Creating index ~s [~p]", [Index, Node]), - ok = rpc:call(Node, yz_index, create, [Index]). - -%% @private -set_bucket_props(Node, Bucket, Index) -> - Props = [{search_index, Index}], - rpc:call(Node, riak_core_bucket, set_bucket, [Bucket, Props]). diff --git a/tests/yz_extractors.erl b/tests/yz_extractors.erl deleted file mode 100644 index 29d51c4b..00000000 --- a/tests/yz_extractors.erl +++ /dev/null @@ -1,441 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2015 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%------------------------------------------------------------------- - -%% @doc Test that checks if we're caching the extractor map and that -%% creating custom extractors is doable via protobufs. -%% @end - --module(yz_extractors). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). --include_lib("riakc/include/riakc.hrl"). - --define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). --define(TYPE1, <<"extractors_in_paradise">>). --define(TYPE2, <<"extractors_in_paradiso">>). --define(INDEX1, <<"test_idx1">>). --define(BUCKET1, {?TYPE1, <<"test_bkt1">>}). --define(INDEX2, <<"test_idx2">>). --define(BUCKET2, {?TYPE2, <<"test_bkt2">>}). --define(TYPE3, <<"type3">>). --define(BUCKET3, {?TYPE3, <<"test_bkt3">>}). --define(INDEX3, <<"test_idx3">>). --define(SCHEMANAME, <<"test">>). --define(TEST_SCHEMA, -<<" - - - - - - - - - - - - - - - - - - -_yz_id - - - - - - - - - - - - - - - - - - - - - -">>). --define(TEST_SCHEMA_UPGRADE, -<<" - - - - - - - - - - - - - - - - - - - -_yz_id - - - - - - - - - - - - - - - - - - - - - -">>). --define(YZ_CAP, {yokozuna, extractor_map_in_cmd}). --define(GET_MAP_RING_MFA, {yz_extractor, get_map, 1}). --define(GET_MAP_MFA, {yz_extractor, get_map, 0}). --define(GET_MAP_READTHROUGH_MFA, {yz_extractor, get_map_read_through, 0}). --define(YZ_META_EXTRACTORS, {yokozuna, extractors}). --define(YZ_EXTRACTOR_MAP, yokozuna_extractor_map). --define(NEW_EXTRACTOR, {"application/httpheader", yz_noop_extractor}). --define(EXTRACTOR_CT, element(1, ?NEW_EXTRACTOR)). --define(EXTRACTOR_MOD, element(2, ?NEW_EXTRACTOR)). --define(DEFAULT_MAP, [{default, yz_noop_extractor}, - {"application/json",yz_json_extractor}, - {"application/riak_counter", yz_dt_extractor}, - {"application/riak_map", yz_dt_extractor}, - {"application/riak_set", yz_dt_extractor}, - {"application/xml",yz_xml_extractor}, - {"text/plain",yz_text_extractor}, - {"text/xml",yz_xml_extractor} - ]). --define(EXTRACTMAPEXPECT, lists:sort(?DEFAULT_MAP ++ [?NEW_EXTRACTOR])). --define(SEQMAX, 20). --define(NVAL, 3). --define(CFG, - [ - {riak_kv, - [ - %% allow AAE to build trees and exchange rapidly - {anti_entropy_build_limit, {100, 1000}}, - {anti_entropy_concurrency, 8}, - {anti_entropy_tick, 1000}, - %% but start with AAE turned off so as not to interfere with earlier parts of the test - {anti_entropy, {off, []}} - ]}, - {yokozuna, - [ - {enabled, true} - ]} - ]). - -confirm() -> - %% This test explicitly requires an upgrade from 2.0.5 to test a - %% new capability - OldVsn = "2.0.5", - - [_, Node|_] = Cluster = rt:build_cluster(lists:duplicate(4, {OldVsn, ?CFG})), - rt:wait_for_cluster_service(Cluster, yokozuna), - - [rt:assert_capability(ANode, ?YZ_CAP, {unknown_capability, ?YZ_CAP}) || ANode <- Cluster], - - OldPid = rt:pbc(Node), - - GenKeys = yokozuna_rt:gen_keys(?SEQMAX), - KeyCount = length(GenKeys), - - rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA]), - - yokozuna_rt:write_data(Cluster, OldPid, ?INDEX1, - {?SCHEMANAME, ?TEST_SCHEMA}, ?BUCKET1, GenKeys), - - ok = rt:stop_tracing(), - - {ok, BProps} = riakc_pb_socket:get_bucket(OldPid, ?BUCKET1), - N = proplists:get_value(n_val, BProps), - - riakc_pb_socket:stop(OldPid), - - PrevGetMapRingCC = rt:get_call_count(Cluster, ?GET_MAP_RING_MFA), - PrevGetMapCC = rt:get_call_count(Cluster, ?GET_MAP_MFA), - ?assertEqual(KeyCount * N, PrevGetMapRingCC), - ?assertEqual(KeyCount * N, PrevGetMapCC), - - %% test query count - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX1, KeyCount), - - {RingVal1, MDVal1} = get_ring_and_cmd_vals(Node, ?YZ_META_EXTRACTORS, - ?YZ_EXTRACTOR_MAP), - - ?assertEqual(undefined, MDVal1), - %% In previous version, Ring only gets map metadata if a non-default - %% extractor is registered - ?assertEqual(undefined, RingVal1), - - ?assertEqual(?DEFAULT_MAP, get_map(Node)), - - %% %% Custom Register - ExtractMap = register_extractor(Node, ?EXTRACTOR_CT, ?EXTRACTOR_MOD), - - ?assertEqual(?EXTRACTMAPEXPECT, ExtractMap), - - %% Upgrade - yokozuna_rt:rolling_upgrade(Cluster, current), - - [rt:wait_until_ready(ANode) || ANode <- Cluster], - - [rt:assert_capability(ANode, ?YZ_CAP, true) || ANode <- Cluster], - [rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) || - ANode <- Cluster], - - %% test query count again - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX1, KeyCount), - - Pid = rt:pbc(Node), - - rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA, - ?GET_MAP_READTHROUGH_MFA]), - - yokozuna_rt:write_data(Cluster, Pid, ?INDEX2, {?SCHEMANAME, ?TEST_SCHEMA}, - ?BUCKET2, GenKeys), - yokozuna_rt:commit(Cluster, ?INDEX2), - - ok = rt:stop_tracing(), - - riakc_pb_socket:stop(Pid), - - CurrGetMapRingCC = rt:get_call_count(Cluster, ?GET_MAP_RING_MFA), - CurrGetMapCC = rt:get_call_count(Cluster, ?GET_MAP_MFA), - CurrGetMapRTCC = rt:get_call_count(Cluster, ?GET_MAP_READTHROUGH_MFA), - - lager:info("Number of calls to get the map from the ring - current: ~p~n, previous: ~p~n", - [CurrGetMapRingCC, PrevGetMapRingCC]), - ?assert(CurrGetMapRingCC < PrevGetMapRingCC), - lager:info("Number of calls to get the map - current: ~p~n, previous: ~p~n", - [CurrGetMapCC, PrevGetMapCC]), - ?assert(CurrGetMapCC =< PrevGetMapCC), - lager:info("Number of calls to get_map_read_through/0: ~p~n, Number of calls to get_map/0: ~p~n", - [CurrGetMapRTCC, CurrGetMapCC]), - ?assert(CurrGetMapRTCC =< CurrGetMapCC), - - {_RingVal2, MDVal2} = get_ring_and_cmd_vals(Node, ?YZ_META_EXTRACTORS, - ?YZ_EXTRACTOR_MAP), - - ?assertEqual(?EXTRACTMAPEXPECT, MDVal2), - ?assertEqual(?EXTRACTMAPEXPECT, get_map(Node)), - - Packet = <<"GET http://www.google.com HTTP/1.1\n">>, - test_extractor_works(Cluster, Packet), - test_extractor_with_aae_expire(Cluster, ?INDEX2, ?BUCKET2, Packet), - test_bad_extraction(Cluster), - - pass. - -%%%=================================================================== -%%% Private -%%%=================================================================== - -get_ring_and_cmd_vals(Node, Prefix, Key) -> - Ring = rpc:call(Node, yz_misc, get_ring, [transformed]), - MDVal = metadata_get(Node, Prefix, Key), - RingVal = ring_meta_get(Node, Key, Ring), - {RingVal, MDVal}. - -metadata_get(Node, Prefix, Key) -> - rpc:call(Node, riak_core_metadata, get, [Prefix, Key, []]). - -ring_meta_get(Node, Key, Ring) -> - rpc:call(Node, riak_core_ring, get_meta, [Key, Ring]). - -register_extractor(Node, MimeType, Mod) -> - rpc:call(Node, yz_extractor, register, [MimeType, Mod]). - -get_map(Node) -> - rpc:call(Node, yz_extractor, get_map, []). - -verify_extractor(Node, PacketData, Mod) -> - rpc:call(Node, yz_extractor, run, [PacketData, Mod]). - -bucket_url({Host,Port}, {BType, BName}, Key) -> - ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", - [Host, Port, BType, BName, Key]). - -test_extractor_works(Cluster, Packet) -> - [rt_intercept:add(ANode, {yz_noop_extractor, - [{{extract, 1}, extract_httpheader}]}) || - ANode <- Cluster], - [rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster], - - ExpectedExtraction = [{method, 'GET'}, - {host, <<"www.google.com">>}, - {uri, <<"/">>}], - ?assertEqual(ExpectedExtraction, - verify_extractor(rt:select_random(Cluster), Packet, ?EXTRACTOR_MOD)). - -test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) -> - %% Now make sure we register extractor across all nodes - [register_extractor(ANode, ?EXTRACTOR_CT, ?EXTRACTOR_MOD) || - ANode <- Cluster], - - Key = <<"google">>, - - {Host, Port} = rt:select_random(yokozuna_rt:host_entries( - rt:connection_info( - Cluster))), - URL = bucket_url({Host, Port}, Bucket, - mochiweb_util:quote_plus(Key)), - - CT = ?EXTRACTOR_CT, - {ok, "204", _, _} = yokozuna_rt:http( - put, URL, [{"Content-Type", CT}], Packet), - - yokozuna_rt:commit(Cluster, Index), - - ANode = rt:select_random(Cluster), - yokozuna_rt:search_expect(ANode, Index, <<"host">>, - <<"www*">>, 1), - - rpc:multicall(Cluster, riak_kv_entropy_manager, enable, []), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()), - - yokozuna_rt:search_expect(ANode, Index, <<"host">>, - <<"www*">>, 1), - - APid = rt:pbc(rt:select_random(Cluster)), - yokozuna_rt:override_schema(APid, Cluster, Index, ?SCHEMANAME, - ?TEST_SCHEMA_UPGRADE), - - {ok, "200", RHeaders, _} = yokozuna_rt:http(get, URL, [{"Content-Type", CT}], - [], []), - VC = proplists:get_value("X-Riak-Vclock", RHeaders), - - {ok, "204", _, _} = yokozuna_rt:http( - put, URL, [{"Content-Type", CT}, {"X-Riak-Vclock", VC}], - Packet), - yokozuna_rt:commit(Cluster, Index), - - yokozuna_rt:search_expect(ANode, Index, <<"method">>, - <<"GET">>, 1), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()), - - yokozuna_rt:search_expect(ANode, Index, <<"method">>, - <<"GET">>, 1), - riakc_pb_socket:stop(APid). - -test_bad_extraction(Cluster) -> - %% Previous test enabled AAE, which makes the number of repairs here not consistent - %% Turn off AAE again just to make the test deterministic. - rpc:multicall(Cluster, riak_kv_entropy_manager, disable, []), - %% - %% register the no-op extractor on all the nodes with a content type - %% - [register_extractor(ANode, "application/bad-extractor", yz_noop_extractor) || - ANode <- Cluster], - %% - %% Set up the intercepts so that they extract non-unicode data - %% - [rt_intercept:add(ANode, {yz_noop_extractor, - [{{extract, 1}, extract_non_unicode_data}]}) || - ANode <- Cluster], - [rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster], - %% - %% create and wire up the bucket to the Solr index/core - %% - yokozuna_rt:create_indexed_bucket_type(Cluster, ?TYPE3, ?INDEX3, ?SCHEMANAME), - %% - %% Grab the stats before - %% - {PreviousFailCount, PreviousErrorThresholdCount} = get_error_stats(Cluster), - %% - %% Put some data into Riak. This should cause the intercepted no-op - %% extractor to generate an object to be written into Solr that contains - %% non-unicode data. - {Host, Port} = rt:select_random( - yokozuna_rt:host_entries(rt:connection_info(Cluster))), - Key = <<"test_bad_extraction">>, - URL = bucket_url({Host, Port}, ?BUCKET3, Key), - Headers = [{"Content-Type", "application/bad-extractor"}], - Data = <<"blahblahblahblah">>, - {ok, "204", _, _} = yokozuna_rt:http(put, URL, Headers, Data), - %% - %% The put should pass, but because it's "bad data", there should - %% be no data in Riak. - %% - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX3, 0), - %% - %% Verify the stats. There should be one more index failure, - %% but there should be more more "melts" (error threshold failures) - %% - yokozuna_rt:wait_until( - Cluster, - fun(_Node) -> - check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount) - end - ), - ok. - -check_error_stats(Cluster, PreviousFailCount, PreviousErrorThresholdCount) -> - {FailCount, ErrorThresholdCount} = get_error_stats(Cluster), - lager:info( - "PreviousFailCount: ~p FailCount: ~p;" - " PreviousErrorThresholdCount: ~p; ErrorThresholdCount: ~p", - [PreviousFailCount, FailCount, - PreviousErrorThresholdCount, ErrorThresholdCount] - ), - PreviousFailCount + ?NVAL == FailCount - andalso PreviousErrorThresholdCount == ErrorThresholdCount. - - -get_error_stats(Cluster) -> - AllStats = [rpc:call(Node, yz_stat, get_stats, []) || Node <- Cluster], - { - lists:sum([get_count([index, bad_entry], count, Stats) || Stats <- AllStats]), - lists:sum([get_count([search_index_error_threshold_failure_count], value, Stats) || Stats <- AllStats]) - }. - -get_count(StatName, Type, Stats) -> - proplists:get_value( - Type, - proplists:get_value( - yz_stat:stat_name(StatName), - Stats - ) - ). diff --git a/tests/yz_handoff.erl b/tests/yz_handoff.erl deleted file mode 100644 index 7ae32ad2..00000000 --- a/tests/yz_handoff.erl +++ /dev/null @@ -1,206 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2014 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%------------------------------------------------------------------- --module(yz_handoff). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). --include_lib("riakc/include/riakc.hrl"). - --define(GET(K,L), proplists:get_value(K, L)). --define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). --define(INDEX, <<"test_idx">>). --define(BUCKET, <<"test_bkt">>). --define(NUMRUNSTATES, 1). --define(SEQMAX, 1000). --define(TESTCYCLE, 20). --define(N, 3). --define(CFG, - [ - {riak_core, - [ - {ring_creation_size, 16}, - {n_val, ?N}, - {handoff_concurrency, 10}, - {vnode_management_timer, 1000} - ]}, - {riak_kv, - [ - %% allow AAE to build trees and exchange rapidly - {anti_entropy_build_limit, {100, 1000}}, - {anti_entropy_concurrency, 8}, - {handoff_rejected_max, infinity} - ]}, - {yokozuna, - [ - {anti_entropy_tick, 1000}, - {enabled, true} - ]} - ]). - --record(trial_state, { - solr_url_before, - solr_url_after, - leave_node, - join_node, - admin_node}). - -confirm() -> - %% Setup cluster initially - [Node1, Node2, _Node3, _Node4, _Node5] = Nodes = rt:build_cluster(5, ?CFG), - - rt:wait_for_cluster_service(Nodes, yokozuna), - - ConnInfo = ?GET(Node2, rt:connection_info([Node2])), - {Host, Port} = ?GET(http, ConnInfo), - Shards = [{N, node_solr_port(N)} || N <- Nodes], - - %% Generate keys, YZ only supports UTF-8 compatible keys - Keys = [<> || N <- lists:seq(1, ?SEQMAX), - not lists:any(fun(E) -> E > 127 end, - binary_to_list(<>))], - KeyCount = length(Keys), - - Pid = rt:pbc(Node2), - yokozuna_rt:write_data(Nodes, Pid, ?INDEX, ?BUCKET, Keys), - - %% Separate out shards for multiple runs - [Shard1|Shards2Rest] = Shards, - {_, SolrPort1} = Shard1, - [{_, SolrPort2}|_] = Shards2Rest, - SolrURL = internal_solr_url(Host, SolrPort1, ?INDEX, Shards), - BucketURL = bucket_keys_url(Host, Port, ?BUCKET), - SearchURL = search_url(Host, Port, ?INDEX), - - lager:info("Verify Replicas Count = (3 * docs/keys) count"), - verify_count(SolrURL, (KeyCount * ?N)), - - States = [#trial_state{solr_url_before = SolrURL, - solr_url_after = internal_solr_url(Host, SolrPort2, ?INDEX, Shards2Rest), - leave_node = Node1}, - #trial_state{solr_url_before = internal_solr_url(Host, SolrPort2, ?INDEX, Shards2Rest), - solr_url_after = SolrURL, - join_node = Node1, - admin_node = Node2}], - - %% Run set of leave/join trials and count/test #'s from the cluster - [[begin - check_data(Nodes, KeyCount, BucketURL, SearchURL, State), - check_counts(Pid, KeyCount, BucketURL) - end || State <- States] - || _ <- lists:seq(1,?NUMRUNSTATES)], - - pass. - -%%%=================================================================== -%%% Private -%%%=================================================================== - -node_solr_port(Node) -> - {ok, P} = riak_core_util:safe_rpc(Node, application, get_env, - [yokozuna, solr_port]), - P. - -internal_solr_url(Host, Port, Index) -> - ?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]). -internal_solr_url(Host, Port, Index, Shards) -> - Ss = [internal_solr_url(Host, ShardPort, Index) - || {_, ShardPort} <- Shards], - ?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=*:*&shards=~s", - [Host, Port, Index, string:join(Ss, ",")]). - -%% @private -bucket_keys_url(Host, Port, BName) -> - ?FMT("http://~s:~B/buckets/~s/keys?keys=true", [Host, Port, BName]). - -%% @private -search_url(Host, Port, Index) -> - ?FMT("http://~s:~B/solr/~s/select?wt=json&q=*:*", [Host, Port, Index]). - -verify_count(Url, ExpectedCount) -> - AreUp = - fun() -> - {ok, "200", _, DBody} = yokozuna_rt:http(get, Url, [], [], "", 60000), - FoundCount = get_count(DBody), - lager:info("FoundCount: ~b, ExpectedCount: ~b", - [FoundCount, ExpectedCount]), - ExpectedCount =:= FoundCount - end, - ?assertEqual(ok, rt:wait_until(AreUp)), - ok. - -get_count(Resp) -> - Struct = mochijson2:decode(Resp), - kvc:path([<<"response">>, <<"numFound">>], Struct). - -get_keys_count(BucketURL) -> - {ok, "200", _, RBody} = yokozuna_rt:http(get, BucketURL, [], []), - Struct = mochijson2:decode(RBody), - length(kvc:path([<<"keys">>], Struct)). - -check_counts(Pid, InitKeyCount, BucketURL) -> - PBCounts = [begin {ok, Resp} = riakc_pb_socket:search( - Pid, ?INDEX, <<"*:*">>), - Resp#search_results.num_found - end || _ <- lists:seq(1,?TESTCYCLE)], - HTTPCounts = [begin {ok, "200", _, RBody} = yokozuna_rt:http( - get, BucketURL, [], []), - Struct = mochijson2:decode(RBody), - length(kvc:path([<<"keys">>], Struct)) - end || _ <- lists:seq(1,?TESTCYCLE)], - MinPBCount = lists:min(PBCounts), - MinHTTPCount = lists:min(HTTPCounts), - lager:info("Before-Node-Leave PB: ~b, After-Node-Leave PB: ~b", - [InitKeyCount, MinPBCount]), - ?assertEqual(InitKeyCount, MinPBCount), - lager:info("Before-Node-Leave PB: ~b, After-Node-Leave HTTP: ~b", - [InitKeyCount, MinHTTPCount]), - ?assertEqual(InitKeyCount, MinHTTPCount). - -check_data(Cluster, KeyCount, BucketURL, SearchURL, S) -> - CheckCount = KeyCount * ?N, - KeysBefore = get_keys_count(BucketURL), - - UpdatedCluster = leave_or_join(Cluster, S), - - yokozuna_rt:wait_for_aae(UpdatedCluster), - - KeysAfter = get_keys_count(BucketURL), - lager:info("KeysBefore: ~b, KeysAfter: ~b", [KeysBefore, KeysAfter]), - ?assertEqual(KeysBefore, KeysAfter), - - lager:info("Verify Search Docs Count =:= key count"), - lager:info("Run Search URL: ~s", [SearchURL]), - verify_count(SearchURL, KeysAfter), - lager:info("Verify Replicas Count = (3 * docs/keys) count"), - lager:info("Run Search URL: ~s", [S#trial_state.solr_url_after]), - verify_count(S#trial_state.solr_url_after, CheckCount). - -leave_or_join(Cluster, S=#trial_state{join_node=undefined}) -> - Node = S#trial_state.leave_node, - rt:leave(Node), - ?assertEqual(ok, rt:wait_until_unpingable(Node)), - Cluster -- [Node]; -leave_or_join(Cluster, S=#trial_state{leave_node=undefined}) -> - Node = S#trial_state.join_node, - NodeAdmin = S#trial_state.admin_node, - ok = rt:start_and_wait(Node), - ok = rt:join(Node, NodeAdmin), - ?assertEqual(ok, rt:wait_until_nodes_ready(Cluster)), - ?assertEqual(ok, rt:wait_until_no_pending_changes(Cluster)), - Cluster ++ [Node]. diff --git a/tests/yz_schema_change_reset.erl b/tests/yz_schema_change_reset.erl deleted file mode 100644 index 3efda592..00000000 --- a/tests/yz_schema_change_reset.erl +++ /dev/null @@ -1,305 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2015 Basho Technologies, Inc. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%%-------------------------------------------------------------------- --module(yz_schema_change_reset). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). --include_lib("riakc/include/riakc.hrl"). - --define(GET(K,L), proplists:get_value(K, L)). --define(INDEX, <<"test_schema_change_reset">>). --define(TYPE, <<"test_schema_change">>). --define(BUCKET1, <<"test_schema_change_reset">>). --define(BUCKET2, {?TYPE, <<"test_schema_change_reset_2">>}). --define(SCHEMANAME, <<"test">>). - --define(TEST_SCHEMA, -<<" - - - - - - - - - - - - - - - - -_yz_id - - - - - - - - - - - - - - - - - - - - -">>). --define(TEST_SCHEMA_UPDATE, -<<" - - - - - - - - - - - - - - - - - - - -_yz_id - - - - - - - - - - - - - - - - - - - -">>). - --define(SEQMAX, 20). --define(CFG, - [{riak_core, - [ - {ring_creation_size, 16} - ]}, - {riak_kv, [ - {anti_entropy_concurrency, 8}, - {anti_entropy_build_limit, {100, 1000}} - ]}, - {yokozuna, - [ - {anti_entropy_tick, 1000}, - {enabled, true} - ]} - ]). - -confirm() -> - [Node1|_RestNodes] = Cluster = rt:build_cluster(4, ?CFG), - rt:wait_for_cluster_service(Cluster, yokozuna), - - GenKeys = yokozuna_rt:gen_keys(?SEQMAX), - KeyCount = length(GenKeys), - lager:info("KeyCount ~p", [KeyCount]), - - Pid = rt:pbc(rt:select_random(Cluster)), - - lager:info("Write initial data to index ~p with schema ~p", - [?INDEX, ?SCHEMANAME]), - - yokozuna_rt:write_data(Cluster, Pid, ?INDEX, - {?SCHEMANAME, ?TEST_SCHEMA}, - ?BUCKET1, GenKeys), - lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s", - [?TYPE, ?INDEX]), - rt:create_and_activate_bucket_type(Node1, ?TYPE, [{datatype, map}, - {search_index, ?INDEX}]), - - lager:info("Write and check age at integer per original schema"), - - NewObj1A = riakc_obj:new(?BUCKET1, <<"keyA">>, - <<"{\"age\":26}">>, - "application/json"), - - NewObj1B = riakc_obj:new(?BUCKET1, <<"keyB">>, - <<"{\"age\":99}">>, - "application/json"), - - {ok, _ObjA} = riakc_pb_socket:put(Pid, NewObj1A, [return_head]), - yokozuna_rt:commit(Cluster, ?INDEX), - {ok, _ObjB} = riakc_pb_socket:put(Pid, NewObj1B, [return_head]), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 2), - - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"age:26">>, {<<"age">>, <<"26">>}, []), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"age:99">>, {<<"age">>, <<"99">>}, []), - - Map1 = riakc_map:update( - {<<"0_foo">>, register}, - fun(R) -> - riakc_register:set(<<"44ab">>, R) - end, riakc_map:new()), - ok = riakc_pb_socket:update_type( - Pid, - ?BUCKET2, - <<"keyMap1">>, - riakc_map:to_op(Map1)), - - {ok, Map2} = riakc_pb_socket:fetch_type(Pid, ?BUCKET2, <<"keyMap1">>), - Map3 = riakc_map:update( - {<<"1_baz">>, counter}, - fun(R) -> - riakc_counter:increment(10, R) - end, Map2), - ok = riakc_pb_socket:update_type( - Pid, - ?BUCKET2, - <<"keyMap1">>, - riakc_map:to_op(Map3)), - - yokozuna_rt:commit(Cluster, ?INDEX), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"0_foo_register:44ab">>, - {<<"0_foo_register">>, <<"44ab">>}, - []), - - lager:info("Expire and re-check count before updating schema"), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_aae(Cluster), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 3), - - yokozuna_rt:override_schema(Pid, Cluster, ?INDEX, ?SCHEMANAME, ?TEST_SCHEMA_UPDATE), - - lager:info("Write and check hello_i at integer per schema update"), - - NewObj2 = riakc_obj:new(?BUCKET1, <<"key2">>, - <<"{\"hello_i\":36}">>, - "application/json"), - - {ok, _Obj2} = riakc_pb_socket:put(Pid, NewObj2, [return_head]), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 4), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"hello_i:36">>, {<<"hello_i">>, <<"36">>}, []), - - lager:info("Write and check age at string per schema update"), - - NewObj3 = riakc_obj:new(?BUCKET1, <<"key3">>, - <<"{\"age\":\"3jlkjkl\"}">>, - "application/json"), - - {ok, _Obj3} = riakc_pb_socket:put(Pid, NewObj3, [return_head]), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 5), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"age:3jlkjkl">>, {<<"age">>, <<"3jlkjkl">>}, - []), - - lager:info("Expire and re-check count to make sure we're correctly indexed - by the new schema"), - - yokozuna_rt:expire_trees(Cluster), - yokozuna_rt:wait_for_aae(Cluster), - - yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 5), - - ANode = rt:select_random(Cluster), - yokozuna_rt:search_expect(ANode, ?INDEX, <<"age">>, <<"*">>, 3), - - lager:info("Re-Put because AAE won't find a diff even though the types - have changed, as it only compares based on bkey currently. - Also, this re-put will work as we have a default bucket (type) - with allow_mult=false... no siblings"), - - {ok, _Obj4} = riakc_pb_socket:put(Pid, NewObj1A, [return_head]), - yokozuna_rt:commit(Cluster, ?INDEX), - - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"age:26">>, {<<"age">>, <<"26">>}, []), - - lager:info("Re-Put Map data by dec/inc counter to account for *change* and - allow previously unindexed counter to be searchable"), - - {ok, Map4} = riakc_pb_socket:fetch_type(Pid, ?BUCKET2, <<"keyMap1">>), - Map5 = riakc_map:update( - {<<"1_baz">>, counter}, - fun(R) -> - riakc_counter:decrement(0, R), - riakc_counter:increment(0, R) - end, Map4), - ok = riakc_pb_socket:update_type( - Pid, - ?BUCKET2, - <<"keyMap1">>, - riakc_map:to_op(Map5)), - - yokozuna_rt:commit(Cluster, ?INDEX), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"0_foo_register:44ab">>, - {<<"0_foo_register">>, <<"44ab">>}, - []), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"1_baz_counter:10">>, - {<<"1_baz_counter">>, <<"10">>}, - []), - - lager:info("Test nested json searches w/ unsearched fields ignored"), - - NewObj5 = riakc_obj:new(?BUCKET1, <<"key4">>, - <<"{\"quip\":\"blashj3\", - \"paths\":{\"quip\":\"88\"}}">>, - "application/json"), - {ok, _Obj5} = riakc_pb_socket:put(Pid, NewObj5, [return_head]), - - yokozuna_rt:commit(Cluster, ?INDEX), - yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, - <<"paths.quip:88">>, - {<<"paths.quip">>, <<"88">>}, - []), - - riakc_pb_socket:stop(Pid), - - pass. -