2015-02-12 23:30:33 +00:00
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%%
|
|
|
|
%% Copyright (c) 2014 Basho Technologies, Inc.
|
|
|
|
%%
|
|
|
|
%% This file is provided to you under the Apache License,
|
|
|
|
%% Version 2.0 (the "License"); you may not use this file
|
|
|
|
%% except in compliance with the License. You may obtain
|
|
|
|
%% a copy of the License at
|
|
|
|
%%
|
|
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
%%
|
|
|
|
%% Unless required by applicable law or agreed to in writing,
|
|
|
|
%% software distributed under the License is distributed on an
|
|
|
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
%% KIND, either express or implied. See the License for the
|
|
|
|
%% specific language governing permissions and limitations
|
|
|
|
%% under the License.
|
|
|
|
%%
|
|
|
|
%%-------------------------------------------------------------------
|
|
|
|
-module(yz_handoff).
|
|
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-include_lib("riakc/include/riakc.hrl").
|
|
|
|
|
|
|
|
-define(GET(K,L), proplists:get_value(K, L)).
|
|
|
|
-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
|
|
|
|
-define(INDEX, <<"test_idx">>).
|
|
|
|
-define(BUCKET, <<"test_bkt">>).
|
|
|
|
-define(NUMRUNSTATES, 1).
|
2015-04-06 16:28:09 +00:00
|
|
|
-define(SEQMAX, 1000).
|
|
|
|
-define(TESTCYCLE, 20).
|
2015-02-12 23:30:33 +00:00
|
|
|
-define(CFG,
|
|
|
|
[
|
|
|
|
{riak_core,
|
|
|
|
[
|
2015-04-06 16:28:09 +00:00
|
|
|
{ring_creation_size, 16},
|
|
|
|
{handoff_concurrency, 11},
|
|
|
|
{claimant_tick, 1000}
|
2015-02-12 23:30:33 +00:00
|
|
|
]},
|
|
|
|
{yokozuna,
|
|
|
|
[
|
|
|
|
{enabled, true}
|
|
|
|
]}
|
|
|
|
]).
|
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
-record(trial_state, {
|
|
|
|
solr_url_before,
|
|
|
|
solr_url_after,
|
|
|
|
leave_node,
|
|
|
|
join_node,
|
|
|
|
admin_node}).
|
|
|
|
|
2015-02-12 23:30:33 +00:00
|
|
|
confirm() ->
|
|
|
|
%% Setup cluster initially
|
|
|
|
Nodes = rt:build_cluster(5, ?CFG),
|
|
|
|
|
|
|
|
%% We're going to always keep Node2 in the cluster.
|
2015-04-06 16:28:09 +00:00
|
|
|
[Node1, Node2, _Node3, _Node4, _Node5] = Nodes,
|
|
|
|
rt:wait_for_cluster_service(Nodes, yokozuna),
|
2015-02-12 23:30:33 +00:00
|
|
|
|
|
|
|
ConnInfo = ?GET(Node2, rt:connection_info([Node2])),
|
|
|
|
{Host, Port} = ?GET(http, ConnInfo),
|
|
|
|
Shards = [begin {ok, P} = node_solr_port(Node), {Node, P} end || Node <- Nodes],
|
|
|
|
|
|
|
|
%% Generate keys, YZ only supports UTF-8 compatible keys
|
2015-04-06 16:28:09 +00:00
|
|
|
Keys = [<<N:64/integer>> || N <- lists:seq(1, ?SEQMAX),
|
2015-02-12 23:30:33 +00:00
|
|
|
not lists:any(fun(E) -> E > 127 end,
|
|
|
|
binary_to_list(<<N:64/integer>>))],
|
|
|
|
KeyCount = length(Keys),
|
|
|
|
|
|
|
|
Pid = rt:pbc(Node2),
|
|
|
|
riakc_pb_socket:set_options(Pid, [queue_if_disconnected]),
|
|
|
|
|
|
|
|
%% Create a search index and associate with a bucket
|
|
|
|
ok = riakc_pb_socket:create_search_index(Pid, ?INDEX),
|
|
|
|
ok = riakc_pb_socket:set_search_index(Pid, ?BUCKET, ?INDEX),
|
|
|
|
timer:sleep(1000),
|
|
|
|
|
|
|
|
%% Write keys and wait for soft commit
|
|
|
|
lager:info("Writing ~p keys", [KeyCount]),
|
|
|
|
[ok = rt:pbc_write(Pid, ?BUCKET, Key, Key, "text/plain") || Key <- Keys],
|
|
|
|
timer:sleep(1100),
|
|
|
|
|
|
|
|
%% Separate out shards for multiple runs
|
|
|
|
[Shard1|Shards2Rest] = Shards,
|
|
|
|
{_, SolrPort1} = Shard1,
|
|
|
|
[{_, SolrPort2}|_] = Shards2Rest,
|
|
|
|
SolrURL = internal_solr_url(Host, SolrPort1, ?INDEX, Shards),
|
|
|
|
BucketURL = bucket_keys_url(Host, Port, ?BUCKET),
|
2015-04-06 16:28:09 +00:00
|
|
|
SearchURL = search_url(Host, Port, ?INDEX),
|
|
|
|
|
|
|
|
lager:info("Verify Replicas Count = (3 * docs/keys) count"),
|
|
|
|
verify_count(SolrURL, KeyCount * 3),
|
2015-02-12 23:30:33 +00:00
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
States = [#trial_state{solr_url_before = SolrURL,
|
|
|
|
solr_url_after = internal_solr_url(Host, SolrPort2, ?INDEX, Shards2Rest),
|
|
|
|
leave_node = Node1},
|
|
|
|
#trial_state{solr_url_before = internal_solr_url(Host, SolrPort2, ?INDEX, Shards2Rest),
|
|
|
|
solr_url_after = SolrURL,
|
|
|
|
join_node = Node1,
|
|
|
|
admin_node = Node2}],
|
2015-02-12 23:30:33 +00:00
|
|
|
|
|
|
|
%% Run Shell Script to count/test # of replicas and leave/join
|
|
|
|
%% nodes from the cluster
|
|
|
|
[[begin
|
2015-04-06 16:28:09 +00:00
|
|
|
check_data(State, KeyCount, BucketURL, SearchURL),
|
2015-02-12 23:30:33 +00:00
|
|
|
check_counts(Pid, KeyCount, BucketURL)
|
2015-04-06 16:28:09 +00:00
|
|
|
end || State <- States]
|
2015-02-12 23:30:33 +00:00
|
|
|
|| _ <- lists:seq(1,?NUMRUNSTATES)],
|
|
|
|
|
|
|
|
pass.
|
|
|
|
|
|
|
|
%%%===================================================================
|
|
|
|
%%% Private
|
|
|
|
%%%===================================================================
|
|
|
|
|
|
|
|
node_solr_port(Node) ->
|
|
|
|
riak_core_util:safe_rpc(Node, application, get_env,
|
|
|
|
[yokozuna, solr_port]).
|
|
|
|
|
|
|
|
internal_solr_url(Host, Port, Index) ->
|
|
|
|
?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]).
|
|
|
|
internal_solr_url(Host, Port, Index, Shards) ->
|
|
|
|
Ss = [internal_solr_url(Host, ShardPort, Index)
|
|
|
|
|| {_, ShardPort} <- Shards],
|
|
|
|
?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=*:*&shards=~s",
|
|
|
|
[Host, Port, Index, string:join(Ss, ",")]).
|
|
|
|
|
|
|
|
%% @private
|
|
|
|
bucket_keys_url(Host, Port, BName) ->
|
|
|
|
?FMT("http://~s:~B/buckets/~s/keys?keys=true", [Host, Port, BName]).
|
|
|
|
|
|
|
|
%% @private
|
|
|
|
search_url(Host, Port, Index) ->
|
|
|
|
?FMT("http://~s:~B/solr/~s/select?wt=json&q=*:*", [Host, Port, Index]).
|
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
verify_count(Url, ExpectedCount) ->
|
|
|
|
AreUp =
|
2015-02-12 23:30:33 +00:00
|
|
|
fun() ->
|
2015-04-06 16:28:09 +00:00
|
|
|
{ok, "200", _, DBody} = ibrowse:send_req(Url, [], get, []),
|
|
|
|
FoundCount = get_count(DBody),
|
|
|
|
lager:info("FoundCount: ~b, ExpectedCount: ~b", [FoundCount, ExpectedCount]),
|
|
|
|
ExpectedCount =:= FoundCount
|
2015-02-12 23:30:33 +00:00
|
|
|
end,
|
2015-04-06 16:28:09 +00:00
|
|
|
?assertEqual(ok, rt:wait_until(AreUp)),
|
2015-02-12 23:30:33 +00:00
|
|
|
ok.
|
|
|
|
|
|
|
|
get_count(Resp) ->
|
|
|
|
Struct = mochijson2:decode(Resp),
|
|
|
|
kvc:path([<<"response">>, <<"numFound">>], Struct).
|
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
get_keys_count(BucketURL) ->
|
|
|
|
{ok, "200", _, RBody} = ibrowse:send_req(BucketURL, [], get, []),
|
|
|
|
Struct = mochijson2:decode(RBody),
|
2015-02-12 23:30:33 +00:00
|
|
|
length(kvc:path([<<"keys">>], Struct)).
|
|
|
|
|
|
|
|
check_counts(Pid, InitKeyCount, BucketURL) ->
|
2015-04-06 16:28:09 +00:00
|
|
|
PBCounts = [begin {ok, Resp} = riakc_pb_socket:search(Pid, ?INDEX, <<"*:*">>),
|
|
|
|
Resp#search_results.num_found
|
|
|
|
end || _ <- lists:seq(1,?TESTCYCLE)],
|
2015-02-12 23:30:33 +00:00
|
|
|
HTTPCounts = [begin {ok, "200", _, RBody} = ibrowse:send_req(BucketURL, [], get, []),
|
2015-04-06 16:28:09 +00:00
|
|
|
Struct = mochijson2:decode(RBody),
|
|
|
|
length(kvc:path([<<"keys">>], Struct))
|
2015-02-12 23:30:33 +00:00
|
|
|
end || _ <- lists:seq(1,?TESTCYCLE)],
|
|
|
|
MinPBCount = lists:min(PBCounts),
|
|
|
|
MinHTTPCount = lists:min(HTTPCounts),
|
|
|
|
lager:info("Before-Node-Leave PB: ~b, After-Node-Leave PB: ~b", [InitKeyCount, MinPBCount]),
|
|
|
|
?assertEqual(InitKeyCount, MinPBCount),
|
|
|
|
lager:info("Before-Node-Leave PB: ~b, After-Node-Leave HTTP: ~b", [InitKeyCount, MinHTTPCount]),
|
|
|
|
?assertEqual(InitKeyCount, MinHTTPCount).
|
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
check_data(S, KeyCount, BucketURL, SearchURL) ->
|
|
|
|
CheckCount = KeyCount * 3,
|
|
|
|
KeysBefore = get_keys_count(BucketURL),
|
2015-02-12 23:30:33 +00:00
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
leave_or_join(S),
|
2015-02-12 23:30:33 +00:00
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
KeysAfter = get_keys_count(BucketURL),
|
2015-02-12 23:30:33 +00:00
|
|
|
lager:info("KeysBefore: ~b, KeysAfter: ~b", [KeysBefore, KeysAfter]),
|
|
|
|
?assertEqual(KeysBefore, KeysAfter),
|
|
|
|
|
2015-04-06 16:28:09 +00:00
|
|
|
lager:info("Verify Search Docs Count =:= key count"),
|
|
|
|
verify_count(SearchURL, KeysAfter),
|
|
|
|
lager:info("Verify Replicas Count = (3 * docs/keys) count"),
|
|
|
|
verify_count(S#trial_state.solr_url_after, CheckCount).
|
|
|
|
|
|
|
|
leave_or_join(S=#trial_state{join_node=undefined}) ->
|
|
|
|
Node = S#trial_state.leave_node,
|
|
|
|
rt:leave(Node),
|
|
|
|
?assertEqual(ok, rt:wait_until_unpingable(Node));
|
|
|
|
leave_or_join(S=#trial_state{leave_node=undefined}) ->
|
|
|
|
Node = S#trial_state.join_node,
|
|
|
|
NodeAdmin = S#trial_state.admin_node,
|
|
|
|
ok = rt:start_and_wait(Node),
|
|
|
|
ok = rt:join(Node, NodeAdmin),
|
|
|
|
?assertEqual(ok, rt:wait_until_nodes_ready([NodeAdmin, Node])),
|
|
|
|
?assertEqual(ok, rt:wait_until_no_pending_changes([NodeAdmin, Node])).
|