2i pagination tests

This commit is contained in:
Russell Brown 2013-04-20 18:22:34 +01:00
parent 94b8e27cec
commit a8e667eab8
4 changed files with 328 additions and 10 deletions

View File

@ -20,6 +20,9 @@
-module(secondary_index_tests).
-behavior(riak_test).
-export([confirm/0]).
-export([put_an_object/2, put_an_object/4, int_to_key/1,
stream_pb/2, stream_pb/3, pb_query/3, http_query/2,
http_query/3, http_stream/3, int_to_field1_bin/1]).
-include_lib("eunit/include/eunit.hrl").
-define(BUCKET, <<"2ibucket">>).
@ -65,16 +68,6 @@ confirm() ->
TestIdxVal),
pass.
put_an_object(Pid, N) ->
lager:info("Putting object ~p", [N]),
Indexes = [{"field1_bin", list_to_binary(io_lib:format("val~p", [N]))},
{"field2_int", N}],
MetaData = dict:from_list([{<<"index">>, Indexes}]),
Robj0 = riakc_obj:new(?BUCKET, list_to_binary(io_lib:format("obj~p", [N]))),
Robj1 = riakc_obj:update_value(Robj0, io_lib:format("data~p", [N])),
Robj2 = riakc_obj:update_metadata(Robj1, MetaData),
riakc_pb_socket:put(Pid, Robj2).
assertExactQuery(Pid, Expected, Index, Value) ->
lager:info("Searching Index ~p for ~p", [Index, Value]),
@ -91,3 +84,119 @@ assertRangeQuery(Pid, Expected, Index, StartValue, EndValue) ->
lager:info("Expected: ~p", [Expected]),
lager:info("Actual : ~p", [ActualKeys]),
?assertEqual(Expected, ActualKeys).
%% general 2i utility
put_an_object(Pid, N) ->
Key = int_to_key(N),
BinIndex = int_to_field1_bin(N),
put_an_object(Pid, Key, N, BinIndex).
put_an_object(Pid, Key, IntIndex, BinIndex) ->
lager:info("Putting object ~p", [Key]),
Indexes = [{"field1_bin", BinIndex},
{"field2_int", IntIndex}],
MetaData = dict:from_list([{<<"index">>, Indexes}]),
Robj0 = riakc_obj:new(?BUCKET, Key),
Robj1 = riakc_obj:update_value(Robj0, io_lib:format("data~p", [IntIndex])),
Robj2 = riakc_obj:update_metadata(Robj1, MetaData),
riakc_pb_socket:put(Pid, Robj2).
int_to_key(N) ->
list_to_binary(io_lib:format("obj~p", [N])).
int_to_field1_bin(N) ->
list_to_binary(io_lib:format("val~p", [N])).
stream_pb(Pid, Q) ->
pb_query(Pid, Q, [stream]),
stream_loop().
stream_pb(Pid, Q, Opts) ->
pb_query(Pid, Q, [stream|Opts]),
stream_loop().
stream_loop() ->
stream_loop(orddict:new()).
stream_loop(Acc) ->
receive {_Ref, done} ->
{ok, orddict:to_list(Acc)};
{_Ref, {keys, Keys}} ->
Acc2 = orddict:update(keys, fun(Existing) -> Existing++Keys end, Keys, Acc),
stream_loop(Acc2);
{_Ref, {results, Results}} ->
Acc2 = orddict:update(results, fun(Existing) -> Existing++Results end, Results, Acc),
stream_loop(Acc2);
{_Ref, Res} when is_list(Res) ->
Keys = proplists:get_value(keys, Res, []),
Continuation = proplists:get_value(continuation, Res),
Acc2 = orddict:update(keys, fun(Existing) -> Existing++Keys end, [], Acc),
Acc3 = orddict:store(continuation, Continuation, Acc2),
stream_loop(Acc3);
{_Ref, Wat} ->
lager:info("got a wat ~p", [Wat]),
stream_loop(Acc)
end.
pb_query(Pid, {Field, Val}, Opts) ->
riakc_pb_socket:get_index_eq(Pid, ?BUCKET, Field, Val, Opts);
pb_query(Pid, {Field, Start, End}, Opts) ->
riakc_pb_socket:get_index_range(Pid, ?BUCKET, Field, Start, End, Opts).
http_stream(NodePath, Query, Opts) ->
http_query(NodePath, Query, Opts, stream).
http_query(NodePath, Q) ->
http_query(NodePath, Q, []).
http_query(NodePath, Query, Opts) ->
http_query(NodePath, Query, Opts, undefined).
http_query(NodePath, {Field, Value}, Opts, Pid) ->
QString = opts_to_qstring(Opts, []),
Url = url("~s/buckets/~s/index/~s/~s~s", [NodePath, ?BUCKET, Field, Value, QString]),
http_get(Url, Pid);
http_query(NodePath, {Field, Start, End}, Opts, Pid) ->
QString = opts_to_qstring(Opts, []),
Url = url("~s/buckets/~s/index/~s/~s/~s~s", [NodePath, ?BUCKET, Field, Start, End, QString]),
http_get(Url, Pid).
url(Format, Elements) ->
Path = io_lib:format(Format, Elements),
lists:flatten(Path).
http_get(Url, undefined) ->
lager:info("getting ~p", [Url]),
{ok,{{"HTTP/1.1",200,"OK"}, _, Body}} = httpc:request(Url),
{struct, Result} = mochijson2:decode(Body),
Result;
http_get(Url, stream) ->
lager:info("streaming ~p", [Url]),
{ok, Ref} = httpc:request(get, {Url, []}, [], [{stream, self}, {sync, false}]),
http_stream_loop(Ref, []).
opts_to_qstring([], QString) ->
QString;
opts_to_qstring([Opt|Rest], []) ->
QOpt = opt_to_string("?", Opt),
opts_to_qstring(Rest, QOpt);
opts_to_qstring([Opt|Rest], QString) ->
QOpt = opt_to_string("&", Opt),
opts_to_qstring(Rest, QString++QOpt).
opt_to_string(Sep, {Name, Value}) when is_integer(Value) ->
io_lib:format(Sep++"~s=~p", [Name, Value]);
opt_to_string(Sep, {Name, Value})->
io_lib:format(Sep++"~s=~s", [Name, Value]);
opt_to_string(Sep, Name) ->
io_lib:format(Sep++"~s=~s", [Name, true]).
http_stream_loop(Ref, Acc) ->
receive {http, {Ref, stream_end, _Headers}} ->
{struct, Result} = mochijson2:decode(lists:flatten(lists:reverse(Acc))),
Result;
{http, {Ref, stream_start, _Headers}} ->
http_stream_loop(Ref, Acc);
{http, {Ref, stream, Body}} ->
http_stream_loop(Ref, [binary_to_list(Body) | Acc])
end.

63
tests/verify_2i_limit.erl Normal file
View File

@ -0,0 +1,63 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(verify_2i_limit).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(secondary_index_tests, [put_an_object/2, int_to_key/1,
stream_pb/3, http_query/3]).
-define(BUCKET, <<"2ibucket">>).
-define(FOO, <<"foo">>).
-define(MAX_RESULTS, 50).
confirm() ->
inets:start(),
Nodes = rt:build_cluster(3),
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
RiakHttp = rt:http_url(hd(Nodes)),
PBPid = rt:pbc(hd(Nodes)),
[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 100)]),
{FirstHalf, Rest} = lists:split(?MAX_RESULTS, ExpectedKeys),
Q = {<<"$key">>, int_to_key(0), int_to_key(999)},
%% PB
{ok, PBRes} = stream_pb(PBPid, Q, [{max_results, ?MAX_RESULTS}]),
?assertEqual(FirstHalf, proplists:get_value(keys, PBRes, [])),
PBContinuation = proplists:get_value(continuation, PBRes),
{ok, PBKeys2} = stream_pb(PBPid, Q, [{continuation, PBContinuation}]),
?assertEqual(Rest, proplists:get_value(keys, PBKeys2, [])),
%% HTTP
HttpRes = http_query(RiakHttp, Q, [{max_results, ?MAX_RESULTS}]),
?assertEqual(FirstHalf, proplists:get_value(<<"keys">>, HttpRes, [])),
HttpContinuation = proplists:get_value(<<"continuation">>, HttpRes),
?assertEqual(PBContinuation, HttpContinuation),
HttpRes2 = http_query(RiakHttp, Q, [{continuation, HttpContinuation}]),
?assertEqual(Rest, proplists:get_value(<<"keys">>, HttpRes2, [])),
riakc_pb_socket:stop(PBPid),
pass.

View File

@ -0,0 +1,78 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(verify_2i_returnterms).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(secondary_index_tests, [put_an_object/2, put_an_object/4, int_to_key/1,
stream_pb/3, http_query/3]).
-define(BUCKET, <<"2ibucket">>).
-define(FOO, <<"foo">>).
-define(Q_OPTS, [{return_terms, true}]).
confirm() ->
inets:start(),
Nodes = rt:build_cluster(3),
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
RiakHttp = rt:http_url(hd(Nodes)),
PBPid = rt:pbc(hd(Nodes)),
[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
[put_an_object(PBPid, int_to_key(N), N, ?FOO) || N <- lists:seq(101, 200)],
%% Bucket, key, and index_eq queries should ignore `return_terms'
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]),
assertEqual(RiakHttp, PBPid, ExpectedKeys, {<<"$key">>, int_to_key(0), int_to_key(999)}, ?Q_OPTS, keys),
assertEqual(RiakHttp, PBPid, ExpectedKeys, { <<"$bucket">>, ?BUCKET}, ?Q_OPTS, keys),
ExpectedFooKeys = lists:sort([int_to_key(N) || N <- lists:seq(101, 200)]),
assertEqual(RiakHttp, PBPid, ExpectedFooKeys, {<<"field1_bin">>, ?FOO}, ?Q_OPTS, keys),
%% Note: not sorted by key, but by value (the int index)
%% Should return "val, key" pairs
ExpectedRangeResults = [{list_to_binary(integer_to_list(N)), int_to_key(N)} || N <- lists:seq(1, 100)],
assertEqual(RiakHttp, PBPid, ExpectedRangeResults, {<<"field2_int">>, "1", "100"}, ?Q_OPTS, results),
riakc_pb_socket:stop(PBPid),
pass.
%% Check the PB result against our expectations
%% and the non-streamed HTTP
assertEqual(Http, PB, Expected, Query, Opts, ResultKey) ->
{ok, PBRes} = stream_pb(PB, Query, Opts),
PBKeys = proplists:get_value(ResultKey, PBRes, []),
HTTPRes = http_query(Http, Query, Opts),
HTTPResults0 = proplists:get_value(atom_to_binary(ResultKey, latin1), HTTPRes, []),
HTTPResults = decode_http_results(ResultKey, HTTPResults0),
?assertEqual(Expected, PBKeys),
?assertEqual(Expected, HTTPResults).
decode_http_results(keys, Keys) ->
Keys;
decode_http_results(results, Results) ->
decode_http_results(Results, []);
decode_http_results([], Acc) ->
lists:reverse(Acc);
decode_http_results([{struct, [Res]} | Rest ], Acc) ->
decode_http_results(Rest, [Res | Acc]).

View File

@ -0,0 +1,68 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(verify_2i_stream).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(secondary_index_tests, [put_an_object/2, put_an_object/4, int_to_key/1,
stream_pb/2, http_query/2, http_stream/3]).
-define(BUCKET, <<"2ibucket">>).
-define(FOO, <<"foo">>).
confirm() ->
inets:start(),
Nodes = rt:build_cluster(3),
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
RiakHttp = rt:http_url(hd(Nodes)),
PBPid = rt:pbc(hd(Nodes)),
[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
[put_an_object(PBPid, int_to_key(N), N, ?FOO) || N <- lists:seq(101, 200)],
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]),
assertEqual(RiakHttp, PBPid, ExpectedKeys, {<<"$key">>, int_to_key(0), int_to_key(999)}),
assertEqual(RiakHttp, PBPid, ExpectedKeys, { <<"$bucket">>, ?BUCKET}),
ExpectedFooKeys = lists:sort([int_to_key(N) || N <- lists:seq(101, 200)]),
assertEqual(RiakHttp, PBPid, ExpectedFooKeys, {<<"field1_bin">>, ?FOO}),
%% Note: not sorted by key, but by value (the int index)
ExpectedRangeKeys = [int_to_key(N) || N <- lists:seq(1, 100)],
assertEqual(RiakHttp, PBPid, ExpectedRangeKeys, {<<"field2_int">>, "1", "100"}),
riakc_pb_socket:stop(PBPid),
pass.
%% Check the PB result against our expectations
%% and the non-streamed HTTP
assertEqual(Http, PB, Expected, Query) ->
{ok, PBRes} = stream_pb(PB, Query),
PBKeys = proplists:get_value(keys, PBRes, []),
HTTPRes = http_query(Http, Query),
StreamHTTPRes = http_stream(Http, Query, []),
lager:info("Got ~p.", [StreamHTTPRes]),
HTTPKeys = proplists:get_value(<<"keys">>, HTTPRes, []),
StreamHttpKeys = proplists:get_value(<<"keys">>, StreamHTTPRes, []),
?assertEqual(Expected, PBKeys),
?assertEqual(Expected, HTTPKeys),
?assertEqual(Expected, StreamHttpKeys).