mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-07 00:55:21 +00:00
Merge pull request #266 from basho/pt29-2i-pagination
Tests 2 pagination / streaming / term returning
This commit is contained in:
commit
011aa71510
36
src/rt.erl
36
src/rt.erl
@ -75,6 +75,7 @@
|
|||||||
pbc_write/4,
|
pbc_write/4,
|
||||||
pbc_put_dir/3,
|
pbc_put_dir/3,
|
||||||
pbc_put_file/4,
|
pbc_put_file/4,
|
||||||
|
pbc_wait_until_really_deleted/5,
|
||||||
pmap/2,
|
pmap/2,
|
||||||
priv_dir/0,
|
priv_dir/0,
|
||||||
remove/2,
|
remove/2,
|
||||||
@ -869,6 +870,41 @@ pbc_put_dir(Pid, Bucket, Dir) ->
|
|||||||
[pbc_put_file(Pid, Bucket, list_to_binary(F), filename:join([Dir, F]))
|
[pbc_put_file(Pid, Bucket, list_to_binary(F), filename:join([Dir, F]))
|
||||||
|| F <- Files].
|
|| F <- Files].
|
||||||
|
|
||||||
|
%% @doc Wait until the given keys have been really, really deleted.
|
||||||
|
%% Useful when you care about the keys not being there. Delete simply writes
|
||||||
|
%% tombstones under the given keys, so those are still seen by key folding
|
||||||
|
%% operations.
|
||||||
|
pbc_wait_until_really_deleted(_Pid, _Bucket, [], _Delay, _Retries) ->
|
||||||
|
ok;
|
||||||
|
pbc_wait_until_really_deleted(_Pid, _Bucket, _Keys, _Delay, Retries) when Retries < 1 ->
|
||||||
|
{error, timeout};
|
||||||
|
pbc_wait_until_really_deleted(Pid, Bucket, Keys, Delay, Retries)
|
||||||
|
when is_integer(Retries), Retries > 0, is_list(Keys) ->
|
||||||
|
StillThere =
|
||||||
|
fun(K) ->
|
||||||
|
Res = riakc_pb_socket:get(Pid, Bucket, K,
|
||||||
|
[{r, 1},
|
||||||
|
{notfound_ok, false},
|
||||||
|
{basic_quorum, false},
|
||||||
|
deletedvclock]),
|
||||||
|
case Res of
|
||||||
|
{error, notfound} ->
|
||||||
|
false;
|
||||||
|
_ ->
|
||||||
|
%% Tombstone still around
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
case lists:filter(StillThere, Keys) of
|
||||||
|
[] ->
|
||||||
|
ok;
|
||||||
|
NewKeys ->
|
||||||
|
timer:sleep(Delay),
|
||||||
|
pbc_wait_until_really_deleted(Pid, Bucket, NewKeys, Delay, Retries-1)
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%% @doc Returns HTTP URL information for a list of Nodes
|
%% @doc Returns HTTP URL information for a list of Nodes
|
||||||
http_url(Nodes) when is_list(Nodes) ->
|
http_url(Nodes) when is_list(Nodes) ->
|
||||||
[begin
|
[begin
|
||||||
|
@ -20,6 +20,9 @@
|
|||||||
-module(secondary_index_tests).
|
-module(secondary_index_tests).
|
||||||
-behavior(riak_test).
|
-behavior(riak_test).
|
||||||
-export([confirm/0]).
|
-export([confirm/0]).
|
||||||
|
-export([put_an_object/2, put_an_object/4, int_to_key/1,
|
||||||
|
stream_pb/2, stream_pb/3, pb_query/3, http_query/2,
|
||||||
|
http_query/3, http_stream/3, int_to_field1_bin/1]).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(BUCKET, <<"2ibucket">>).
|
-define(BUCKET, <<"2ibucket">>).
|
||||||
@ -39,11 +42,10 @@ confirm() ->
|
|||||||
assertRangeQuery(Pid, [<<"obj10">>, <<"obj11">>, <<"obj12">>], <<"$key">>, <<"obj10">>, <<"obj12">>),
|
assertRangeQuery(Pid, [<<"obj10">>, <<"obj11">>, <<"obj12">>], <<"$key">>, <<"obj10">>, <<"obj12">>),
|
||||||
|
|
||||||
lager:info("Delete an object, verify deletion..."),
|
lager:info("Delete an object, verify deletion..."),
|
||||||
riakc_pb_socket:delete(Pid, ?BUCKET, <<"obj5">>),
|
ToDel = [<<"obj5">>, <<"obj11">>],
|
||||||
riakc_pb_socket:delete(Pid, ?BUCKET, <<"obj11">>),
|
[?assertMatch(ok, riakc_pb_socket:delete(Pid, ?BUCKET, K)) || K <- ToDel],
|
||||||
|
lager:info("Make sure the tombstone is reaped..."),
|
||||||
lager:info("Sleeping for 5 seconds. Make sure the tombstone is reaped..."),
|
?assertMatch(ok, rt:pbc_wait_until_really_deleted(Pid, ?BUCKET, ToDel, 1000, 20)),
|
||||||
timer:sleep(5000),
|
|
||||||
|
|
||||||
assertExactQuery(Pid, [], <<"field1_bin">>, <<"val5">>),
|
assertExactQuery(Pid, [], <<"field1_bin">>, <<"val5">>),
|
||||||
assertExactQuery(Pid, [], <<"field2_int">>, <<"5">>),
|
assertExactQuery(Pid, [], <<"field2_int">>, <<"5">>),
|
||||||
@ -65,20 +67,9 @@ confirm() ->
|
|||||||
TestIdxVal),
|
TestIdxVal),
|
||||||
pass.
|
pass.
|
||||||
|
|
||||||
put_an_object(Pid, N) ->
|
|
||||||
lager:info("Putting object ~p", [N]),
|
|
||||||
Indexes = [{"field1_bin", list_to_binary(io_lib:format("val~p", [N]))},
|
|
||||||
{"field2_int", N}],
|
|
||||||
MetaData = dict:from_list([{<<"index">>, Indexes}]),
|
|
||||||
Robj0 = riakc_obj:new(?BUCKET, list_to_binary(io_lib:format("obj~p", [N]))),
|
|
||||||
Robj1 = riakc_obj:update_value(Robj0, io_lib:format("data~p", [N])),
|
|
||||||
Robj2 = riakc_obj:update_metadata(Robj1, MetaData),
|
|
||||||
riakc_pb_socket:put(Pid, Robj2).
|
|
||||||
|
|
||||||
|
|
||||||
assertExactQuery(Pid, Expected, Index, Value) ->
|
assertExactQuery(Pid, Expected, Index, Value) ->
|
||||||
lager:info("Searching Index ~p for ~p", [Index, Value]),
|
lager:info("Searching Index ~p for ~p", [Index, Value]),
|
||||||
{ok, Results} = riakc_pb_socket:get_index(Pid, ?BUCKET, Index, Value),
|
{ok, {keys, Results}} = riakc_pb_socket:get_index(Pid, ?BUCKET, Index, Value),
|
||||||
ActualKeys = lists:sort(Results),
|
ActualKeys = lists:sort(Results),
|
||||||
lager:info("Expected: ~p", [Expected]),
|
lager:info("Expected: ~p", [Expected]),
|
||||||
lager:info("Actual : ~p", [ActualKeys]),
|
lager:info("Actual : ~p", [ActualKeys]),
|
||||||
@ -86,8 +77,124 @@ assertExactQuery(Pid, Expected, Index, Value) ->
|
|||||||
|
|
||||||
assertRangeQuery(Pid, Expected, Index, StartValue, EndValue) ->
|
assertRangeQuery(Pid, Expected, Index, StartValue, EndValue) ->
|
||||||
lager:info("Searching Index ~p for ~p-~p", [Index, StartValue, EndValue]),
|
lager:info("Searching Index ~p for ~p-~p", [Index, StartValue, EndValue]),
|
||||||
{ok, Results} = riakc_pb_socket:get_index(Pid, ?BUCKET, Index, StartValue, EndValue),
|
{ok, {keys, Results}} = riakc_pb_socket:get_index(Pid, ?BUCKET, Index, StartValue, EndValue),
|
||||||
ActualKeys = lists:sort(Results),
|
ActualKeys = lists:sort(Results),
|
||||||
lager:info("Expected: ~p", [Expected]),
|
lager:info("Expected: ~p", [Expected]),
|
||||||
lager:info("Actual : ~p", [ActualKeys]),
|
lager:info("Actual : ~p", [ActualKeys]),
|
||||||
?assertEqual(Expected, ActualKeys).
|
?assertEqual(Expected, ActualKeys).
|
||||||
|
|
||||||
|
%% general 2i utility
|
||||||
|
put_an_object(Pid, N) ->
|
||||||
|
Key = int_to_key(N),
|
||||||
|
BinIndex = int_to_field1_bin(N),
|
||||||
|
put_an_object(Pid, Key, N, BinIndex).
|
||||||
|
|
||||||
|
put_an_object(Pid, Key, IntIndex, BinIndex) ->
|
||||||
|
lager:info("Putting object ~p", [Key]),
|
||||||
|
Indexes = [{"field1_bin", BinIndex},
|
||||||
|
{"field2_int", IntIndex}],
|
||||||
|
MetaData = dict:from_list([{<<"index">>, Indexes}]),
|
||||||
|
Robj0 = riakc_obj:new(?BUCKET, Key),
|
||||||
|
Robj1 = riakc_obj:update_value(Robj0, io_lib:format("data~p", [IntIndex])),
|
||||||
|
Robj2 = riakc_obj:update_metadata(Robj1, MetaData),
|
||||||
|
riakc_pb_socket:put(Pid, Robj2).
|
||||||
|
|
||||||
|
int_to_key(N) ->
|
||||||
|
list_to_binary(io_lib:format("obj~p", [N])).
|
||||||
|
|
||||||
|
int_to_field1_bin(N) ->
|
||||||
|
list_to_binary(io_lib:format("val~p", [N])).
|
||||||
|
|
||||||
|
stream_pb(Pid, Q) ->
|
||||||
|
pb_query(Pid, Q, [stream]),
|
||||||
|
stream_loop().
|
||||||
|
|
||||||
|
stream_pb(Pid, Q, Opts) ->
|
||||||
|
pb_query(Pid, Q, [stream|Opts]),
|
||||||
|
stream_loop().
|
||||||
|
|
||||||
|
stream_loop() ->
|
||||||
|
stream_loop(orddict:new()).
|
||||||
|
|
||||||
|
stream_loop(Acc) ->
|
||||||
|
receive {_Ref, done} ->
|
||||||
|
{ok, orddict:to_list(Acc)};
|
||||||
|
{_Ref, {keys, Keys}} ->
|
||||||
|
Acc2 = orddict:update(keys, fun(Existing) -> Existing++Keys end, Keys, Acc),
|
||||||
|
stream_loop(Acc2);
|
||||||
|
{_Ref, {results, Results}} ->
|
||||||
|
Acc2 = orddict:update(results, fun(Existing) -> Existing++Results end, Results, Acc),
|
||||||
|
stream_loop(Acc2);
|
||||||
|
{_Ref, Res} when is_list(Res) ->
|
||||||
|
Keys = proplists:get_value(keys, Res, []),
|
||||||
|
Continuation = proplists:get_value(continuation, Res),
|
||||||
|
Acc2 = orddict:update(keys, fun(Existing) -> Existing++Keys end, [], Acc),
|
||||||
|
Acc3 = orddict:store(continuation, Continuation, Acc2),
|
||||||
|
stream_loop(Acc3);
|
||||||
|
{_Ref, Wat} ->
|
||||||
|
lager:info("got a wat ~p", [Wat]),
|
||||||
|
stream_loop(Acc)
|
||||||
|
end.
|
||||||
|
|
||||||
|
pb_query(Pid, {Field, Val}, Opts) ->
|
||||||
|
riakc_pb_socket:get_index_eq(Pid, ?BUCKET, Field, Val, Opts);
|
||||||
|
pb_query(Pid, {Field, Start, End}, Opts) ->
|
||||||
|
riakc_pb_socket:get_index_range(Pid, ?BUCKET, Field, Start, End, Opts).
|
||||||
|
|
||||||
|
http_stream(NodePath, Query, Opts) ->
|
||||||
|
http_query(NodePath, Query, Opts, stream).
|
||||||
|
|
||||||
|
http_query(NodePath, Q) ->
|
||||||
|
http_query(NodePath, Q, []).
|
||||||
|
|
||||||
|
http_query(NodePath, Query, Opts) ->
|
||||||
|
http_query(NodePath, Query, Opts, undefined).
|
||||||
|
|
||||||
|
http_query(NodePath, {Field, Value}, Opts, Pid) ->
|
||||||
|
QString = opts_to_qstring(Opts, []),
|
||||||
|
Url = url("~s/buckets/~s/index/~s/~s~s", [NodePath, ?BUCKET, Field, Value, QString]),
|
||||||
|
http_get(Url, Pid);
|
||||||
|
http_query(NodePath, {Field, Start, End}, Opts, Pid) ->
|
||||||
|
QString = opts_to_qstring(Opts, []),
|
||||||
|
Url = url("~s/buckets/~s/index/~s/~s/~s~s", [NodePath, ?BUCKET, Field, Start, End, QString]),
|
||||||
|
http_get(Url, Pid).
|
||||||
|
|
||||||
|
url(Format, Elements) ->
|
||||||
|
Path = io_lib:format(Format, Elements),
|
||||||
|
lists:flatten(Path).
|
||||||
|
|
||||||
|
http_get(Url, undefined) ->
|
||||||
|
lager:info("getting ~p", [Url]),
|
||||||
|
{ok,{{"HTTP/1.1",200,"OK"}, _, Body}} = httpc:request(Url),
|
||||||
|
{struct, Result} = mochijson2:decode(Body),
|
||||||
|
Result;
|
||||||
|
http_get(Url, stream) ->
|
||||||
|
lager:info("streaming ~p", [Url]),
|
||||||
|
{ok, Ref} = httpc:request(get, {Url, []}, [], [{stream, self}, {sync, false}]),
|
||||||
|
http_stream_loop(Ref, []).
|
||||||
|
|
||||||
|
opts_to_qstring([], QString) ->
|
||||||
|
QString;
|
||||||
|
opts_to_qstring([Opt|Rest], []) ->
|
||||||
|
QOpt = opt_to_string("?", Opt),
|
||||||
|
opts_to_qstring(Rest, QOpt);
|
||||||
|
opts_to_qstring([Opt|Rest], QString) ->
|
||||||
|
QOpt = opt_to_string("&", Opt),
|
||||||
|
opts_to_qstring(Rest, QString++QOpt).
|
||||||
|
|
||||||
|
opt_to_string(Sep, {Name, Value}) when is_integer(Value) ->
|
||||||
|
io_lib:format(Sep++"~s=~p", [Name, Value]);
|
||||||
|
opt_to_string(Sep, {Name, Value})->
|
||||||
|
io_lib:format(Sep++"~s=~s", [Name, Value]);
|
||||||
|
opt_to_string(Sep, Name) ->
|
||||||
|
io_lib:format(Sep++"~s=~s", [Name, true]).
|
||||||
|
|
||||||
|
http_stream_loop(Ref, Acc) ->
|
||||||
|
receive {http, {Ref, stream_end, _Headers}} ->
|
||||||
|
{struct, Result} = mochijson2:decode(lists:flatten(lists:reverse(Acc))),
|
||||||
|
Result;
|
||||||
|
{http, {Ref, stream_start, _Headers}} ->
|
||||||
|
http_stream_loop(Ref, Acc);
|
||||||
|
{http, {Ref, stream, Body}} ->
|
||||||
|
http_stream_loop(Ref, [binary_to_list(Body) | Acc])
|
||||||
|
end.
|
||||||
|
81
tests/verify_2i_limit.erl
Normal file
81
tests/verify_2i_limit.erl
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2013 Basho Technologies, Inc.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
-module(verify_2i_limit).
|
||||||
|
-behavior(riak_test).
|
||||||
|
-export([confirm/0]).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-import(secondary_index_tests, [put_an_object/2, int_to_key/1,
|
||||||
|
stream_pb/3, http_query/3, pb_query/3]).
|
||||||
|
-define(BUCKET, <<"2ibucket">>).
|
||||||
|
-define(FOO, <<"foo">>).
|
||||||
|
-define(MAX_RESULTS, 50).
|
||||||
|
|
||||||
|
confirm() ->
|
||||||
|
inets:start(),
|
||||||
|
|
||||||
|
Nodes = rt:build_cluster(3),
|
||||||
|
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
|
||||||
|
|
||||||
|
RiakHttp = rt:http_url(hd(Nodes)),
|
||||||
|
PBPid = rt:pbc(hd(Nodes)),
|
||||||
|
|
||||||
|
[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
|
||||||
|
|
||||||
|
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 100)]),
|
||||||
|
{FirstHalf, Rest} = lists:split(?MAX_RESULTS, ExpectedKeys),
|
||||||
|
Q = {<<"$key">>, int_to_key(0), int_to_key(999)},
|
||||||
|
|
||||||
|
%% PB
|
||||||
|
{ok, PBRes} = stream_pb(PBPid, Q, [{max_results, ?MAX_RESULTS}]),
|
||||||
|
?assertEqual(FirstHalf, proplists:get_value(keys, PBRes, [])),
|
||||||
|
PBContinuation = proplists:get_value(continuation, PBRes),
|
||||||
|
|
||||||
|
{ok, PBKeys2} = stream_pb(PBPid, Q, [{continuation, PBContinuation}]),
|
||||||
|
?assertEqual(Rest, proplists:get_value(keys, PBKeys2, [])),
|
||||||
|
|
||||||
|
%% HTTP
|
||||||
|
HttpRes = http_query(RiakHttp, Q, [{max_results, ?MAX_RESULTS}]),
|
||||||
|
?assertEqual(FirstHalf, proplists:get_value(<<"keys">>, HttpRes, [])),
|
||||||
|
HttpContinuation = proplists:get_value(<<"continuation">>, HttpRes),
|
||||||
|
?assertEqual(PBContinuation, HttpContinuation),
|
||||||
|
|
||||||
|
HttpRes2 = http_query(RiakHttp, Q, [{continuation, HttpContinuation}]),
|
||||||
|
?assertEqual(Rest, proplists:get_value(<<"keys">>, HttpRes2, [])),
|
||||||
|
|
||||||
|
%% Multiple indexes for single key
|
||||||
|
O1 = riakc_obj:new(?BUCKET, <<"bob">>, <<"1">>),
|
||||||
|
Md = riakc_obj:get_metadata(O1),
|
||||||
|
Md2 = riakc_obj:set_secondary_index(Md, {{integer_index, "i1"}, [300, 301, 302]}),
|
||||||
|
O2 = riakc_obj:update_metadata(O1, Md2),
|
||||||
|
riakc_pb_socket:put(PBPid, O2),
|
||||||
|
|
||||||
|
MQ = {"i1_int", 300, 302},
|
||||||
|
{ok, Res} = pb_query(PBPid, MQ, [{max_results, 2}, return_terms]),
|
||||||
|
|
||||||
|
?assertEqual([{<<"300">>, <<"bob">>},
|
||||||
|
{<<"301">>, <<"bob">>}], proplists:get_value(results, Res)),
|
||||||
|
|
||||||
|
{ok, Res2} = pb_query(PBPid, MQ, [{max_results, 2}, return_terms,
|
||||||
|
{continuation, proplists:get_value(continuation, Res)}]),
|
||||||
|
|
||||||
|
?assertEqual({results,[{<<"302">>,<<"bob">>}]}, Res2),
|
||||||
|
|
||||||
|
riakc_pb_socket:stop(PBPid),
|
||||||
|
pass.
|
78
tests/verify_2i_returnterms.erl
Normal file
78
tests/verify_2i_returnterms.erl
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2013 Basho Technologies, Inc.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
-module(verify_2i_returnterms).
|
||||||
|
-behavior(riak_test).
|
||||||
|
-export([confirm/0]).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-import(secondary_index_tests, [put_an_object/2, put_an_object/4, int_to_key/1,
|
||||||
|
stream_pb/3, http_query/3]).
|
||||||
|
-define(BUCKET, <<"2ibucket">>).
|
||||||
|
-define(FOO, <<"foo">>).
|
||||||
|
-define(Q_OPTS, [{return_terms, true}]).
|
||||||
|
|
||||||
|
confirm() ->
|
||||||
|
inets:start(),
|
||||||
|
|
||||||
|
Nodes = rt:build_cluster(3),
|
||||||
|
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
|
||||||
|
|
||||||
|
RiakHttp = rt:http_url(hd(Nodes)),
|
||||||
|
PBPid = rt:pbc(hd(Nodes)),
|
||||||
|
|
||||||
|
[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
|
||||||
|
[put_an_object(PBPid, int_to_key(N), N, ?FOO) || N <- lists:seq(101, 200)],
|
||||||
|
|
||||||
|
%% Bucket, key, and index_eq queries should ignore `return_terms'
|
||||||
|
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]),
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedKeys, {<<"$key">>, int_to_key(0), int_to_key(999)}, ?Q_OPTS, keys),
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedKeys, { <<"$bucket">>, ?BUCKET}, ?Q_OPTS, keys),
|
||||||
|
|
||||||
|
ExpectedFooKeys = lists:sort([int_to_key(N) || N <- lists:seq(101, 200)]),
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedFooKeys, {<<"field1_bin">>, ?FOO}, ?Q_OPTS, keys),
|
||||||
|
|
||||||
|
%% Note: not sorted by key, but by value (the int index)
|
||||||
|
%% Should return "val, key" pairs
|
||||||
|
ExpectedRangeResults = [{list_to_binary(integer_to_list(N)), int_to_key(N)} || N <- lists:seq(1, 100)],
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedRangeResults, {<<"field2_int">>, "1", "100"}, ?Q_OPTS, results),
|
||||||
|
|
||||||
|
riakc_pb_socket:stop(PBPid),
|
||||||
|
pass.
|
||||||
|
|
||||||
|
%% Check the PB result against our expectations
|
||||||
|
%% and the non-streamed HTTP
|
||||||
|
assertEqual(Http, PB, Expected, Query, Opts, ResultKey) ->
|
||||||
|
{ok, PBRes} = stream_pb(PB, Query, Opts),
|
||||||
|
PBKeys = proplists:get_value(ResultKey, PBRes, []),
|
||||||
|
HTTPRes = http_query(Http, Query, Opts),
|
||||||
|
HTTPResults0 = proplists:get_value(atom_to_binary(ResultKey, latin1), HTTPRes, []),
|
||||||
|
HTTPResults = decode_http_results(ResultKey, HTTPResults0),
|
||||||
|
?assertEqual(Expected, PBKeys),
|
||||||
|
?assertEqual(Expected, HTTPResults).
|
||||||
|
|
||||||
|
decode_http_results(keys, Keys) ->
|
||||||
|
Keys;
|
||||||
|
decode_http_results(results, Results) ->
|
||||||
|
decode_http_results(Results, []);
|
||||||
|
|
||||||
|
decode_http_results([], Acc) ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
decode_http_results([{struct, [Res]} | Rest ], Acc) ->
|
||||||
|
decode_http_results(Rest, [Res | Acc]).
|
||||||
|
|
67
tests/verify_2i_stream.erl
Normal file
67
tests/verify_2i_stream.erl
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2013 Basho Technologies, Inc.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
-module(verify_2i_stream).
|
||||||
|
-behavior(riak_test).
|
||||||
|
-export([confirm/0]).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-import(secondary_index_tests, [put_an_object/2, put_an_object/4, int_to_key/1,
|
||||||
|
stream_pb/2, http_query/2, http_stream/3]).
|
||||||
|
-define(BUCKET, <<"2ibucket">>).
|
||||||
|
-define(FOO, <<"foo">>).
|
||||||
|
|
||||||
|
confirm() ->
|
||||||
|
inets:start(),
|
||||||
|
|
||||||
|
Nodes = rt:build_cluster(3),
|
||||||
|
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
|
||||||
|
|
||||||
|
RiakHttp = rt:http_url(hd(Nodes)),
|
||||||
|
PBPid = rt:pbc(hd(Nodes)),
|
||||||
|
|
||||||
|
[put_an_object(PBPid, N) || N <- lists:seq(0, 100)],
|
||||||
|
[put_an_object(PBPid, int_to_key(N), N, ?FOO) || N <- lists:seq(101, 200)],
|
||||||
|
|
||||||
|
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]),
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedKeys, {<<"$key">>, int_to_key(0), int_to_key(999)}),
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedKeys, { <<"$bucket">>, ?BUCKET}),
|
||||||
|
|
||||||
|
ExpectedFooKeys = lists:sort([int_to_key(N) || N <- lists:seq(101, 200)]),
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedFooKeys, {<<"field1_bin">>, ?FOO}),
|
||||||
|
|
||||||
|
%% Note: not sorted by key, but by value (the int index)
|
||||||
|
ExpectedRangeKeys = [int_to_key(N) || N <- lists:seq(1, 100)],
|
||||||
|
assertEqual(RiakHttp, PBPid, ExpectedRangeKeys, {<<"field2_int">>, "1", "100"}),
|
||||||
|
|
||||||
|
riakc_pb_socket:stop(PBPid),
|
||||||
|
pass.
|
||||||
|
|
||||||
|
%% Check the PB result against our expectations
|
||||||
|
%% and the non-streamed HTTP
|
||||||
|
assertEqual(Http, PB, Expected, Query) ->
|
||||||
|
{ok, PBRes} = stream_pb(PB, Query),
|
||||||
|
PBKeys = proplists:get_value(keys, PBRes, []),
|
||||||
|
HTTPRes = http_query(Http, Query),
|
||||||
|
StreamHTTPRes = http_stream(Http, Query, []),
|
||||||
|
HTTPKeys = proplists:get_value(<<"keys">>, HTTPRes, []),
|
||||||
|
StreamHttpKeys = proplists:get_value(<<"keys">>, StreamHTTPRes, []),
|
||||||
|
?assertEqual(Expected, PBKeys),
|
||||||
|
?assertEqual(Expected, HTTPKeys),
|
||||||
|
?assertEqual(Expected, StreamHttpKeys).
|
||||||
|
|
87
tests/verify_cs_bucket.erl
Normal file
87
tests/verify_cs_bucket.erl
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2013 Basho Technologies, Inc.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @ doc tests the new CS Bucket fold message
|
||||||
|
|
||||||
|
-module(verify_cs_bucket).
|
||||||
|
-behavior(riak_test).
|
||||||
|
-export([confirm/0]).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-import(secondary_index_tests, [put_an_object/2, int_to_key/1]).
|
||||||
|
-define(BUCKET, <<"2ibucket">>).
|
||||||
|
-define(FOO, <<"foo">>).
|
||||||
|
|
||||||
|
confirm() ->
|
||||||
|
Nodes = rt:build_cluster(3),
|
||||||
|
?assertEqual(ok, (rt:wait_until_nodes_ready(Nodes))),
|
||||||
|
|
||||||
|
PBPid = rt:pbc(hd(Nodes)),
|
||||||
|
|
||||||
|
[put_an_object(PBPid, N) || N <- lists:seq(0, 200)],
|
||||||
|
|
||||||
|
ExpectedKeys = lists:sort([int_to_key(N) || N <- lists:seq(0, 200)]),
|
||||||
|
|
||||||
|
undefined = assertEqual(PBPid, ExpectedKeys, ?BUCKET, [{start_key, int_to_key(0)}]),
|
||||||
|
undefined = assertEqual(PBPid, tl(ExpectedKeys), ?BUCKET, [{start_key, int_to_key(0)}, {start_incl, false}]),
|
||||||
|
undefined = assertEqual(PBPid, [int_to_key(104)], ?BUCKET, [{start_key, int_to_key(103)},
|
||||||
|
{end_key, int_to_key(105)},
|
||||||
|
{start_incl, false},
|
||||||
|
{end_incl, false}]),
|
||||||
|
|
||||||
|
%% Limit / continuations
|
||||||
|
|
||||||
|
Continuation1 = assertEqual(PBPid, lists:sublist(ExpectedKeys, 20), ?BUCKET, [{start_key, int_to_key(0)}, {max_results, 20}]),
|
||||||
|
Continuation2 = assertEqual(PBPid, lists:sublist(ExpectedKeys, 21, 20), ?BUCKET,
|
||||||
|
[{start_key, int_to_key(0)}, {max_results, 20}, {continuation, Continuation1}]),
|
||||||
|
undefined = assertEqual(PBPid, lists:sublist(ExpectedKeys, 41, 200), ?BUCKET, [{continuation, Continuation2}, {max_results, 200}]),
|
||||||
|
|
||||||
|
riakc_pb_socket:stop(PBPid),
|
||||||
|
pass.
|
||||||
|
|
||||||
|
%% Check the PB result against our expectations
|
||||||
|
%% and the non-streamed HTTP
|
||||||
|
assertEqual(PB, Expected, Bucket, Opts) ->
|
||||||
|
{ok, PBRes} = stream_pb(PB, Bucket, Opts),
|
||||||
|
PBObjects = proplists:get_value(objects, PBRes, []),
|
||||||
|
Keys = [riakc_obj:key(Obj) || Obj <- PBObjects],
|
||||||
|
?assertEqual(Expected, Keys),
|
||||||
|
proplists:get_value(continuation, PBRes).
|
||||||
|
|
||||||
|
|
||||||
|
stream_pb(Pid, Bucket, Opts) ->
|
||||||
|
riakc_pb_socket:cs_bucket_fold(Pid, Bucket, Opts),
|
||||||
|
stream_loop().
|
||||||
|
|
||||||
|
stream_loop() ->
|
||||||
|
stream_loop(orddict:new()).
|
||||||
|
|
||||||
|
stream_loop(Acc) ->
|
||||||
|
receive {_Ref, done} ->
|
||||||
|
{ok, orddict:to_list(Acc)};
|
||||||
|
{_Ref, {objects, Objects}} ->
|
||||||
|
Acc2 = orddict:update(objects, fun(Existing) -> Existing++Objects end, Objects, Acc),
|
||||||
|
stream_loop(Acc2);
|
||||||
|
{_Ref, Res} when is_list(Res) ->
|
||||||
|
Objects = proplists:get_value(objects, Res, []),
|
||||||
|
Continuation = proplists:get_value(continuation, Res),
|
||||||
|
Acc2 = orddict:update(objects, fun(Existing) -> Existing++Objects end, Objects, Acc),
|
||||||
|
Acc3 = orddict:store(continuation, Continuation, Acc2),
|
||||||
|
stream_loop(Acc3)
|
||||||
|
end.
|
Loading…
Reference in New Issue
Block a user