diff --git a/tests/timeseries_util.erl b/tests/timeseries_util.erl index 6a50914b..5d91b037 100644 --- a/tests/timeseries_util.erl +++ b/tests/timeseries_util.erl @@ -50,7 +50,7 @@ confirm_activate(ClusterType, DDL, Expected) -> pass. -confirm_put(ClusterType, TestType, DDL, Obj, Expected) -> +confirm_put(ClusterType, TestType, DDL, Obj) -> [Node | _] = build_cluster(ClusterType), @@ -66,10 +66,7 @@ confirm_put(ClusterType, TestType, DDL, Obj, Expected) -> Bucket = list_to_binary(get_bucket()), io:format("2 - writing to bucket ~p with:~n- ~p~n", [Bucket, Obj]), C = rt:pbc(Node), - Get = riakc_ts:put(C, Bucket, Obj), - ?assertEqual(Expected, Get), - - pass. + riakc_ts:put(C, Bucket, Obj). confirm_select(ClusterType, TestType, DDL, Data, Qry, Expected) -> @@ -133,24 +130,59 @@ get_bucket() -> "GeoCheckin". get_valid_qry() -> - "select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries'". + "select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX'". get_invalid_qry(borked_syntax) -> "selectah * from GeoCheckin Where time > 1 and time < 10"; get_invalid_qry(key_not_covered) -> "select * from GeoCheckin Where time > 1 and time < 10"; get_invalid_qry(invalid_operator) -> - "select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries' and weather > 'bob'"; + "select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX' and weather > 'bob'"; get_invalid_qry(field_comparison) -> - "select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries' and weather = myfamily"; + "select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX' and weather = family1"; get_invalid_qry(type_error) -> - "select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries' and weather = true". + "select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX' and weather > true". get_valid_select_data() -> - Family = <<"myfamily">>, - Series = <<"myseries">>, + Family = <<"family1">>, + Series = <<"seriesX">>, Times = lists:seq(1, 10), - [[Family, Series, X, get_varchar(), get_float()] || X <- Times]. + [[Family, Series, X, get_varchar(), get_float()] || X <- Times]. + + +-define(SPANNING_STEP, (1000*60*5)). + +get_valid_qry_spanning_quanta() -> + StartTime = 1 + ?SPANNING_STEP * 1, + EndTime = 1 + ?SPANNING_STEP * 10, + lists:flatten( + io_lib:format("select * from GeoCheckin Where time > ~b and time < ~b" + " and myfamily = 'family1' and myseries = 'seriesX'", + [StartTime, EndTime])). + +get_valid_select_data_spanning_quanta() -> + Family = <<"family1">>, + Series = <<"seriesX">>, + Times = lists:seq(1 + ?SPANNING_STEP, 1 + ?SPANNING_STEP * 10, ?SPANNING_STEP), %% five-minute intervals, to span 15-min buckets + [[Family, Series, X, get_varchar(), get_float()] || X <- Times]. + + +get_cols(docs) -> + [<<"myfamily">>, + <<"myseries">>, + <<"time">>, + <<"weather">>, + <<"temperature">>]. + +exclusive_result_from_data(Data, Start, Finish) when is_integer(Start) andalso + is_integer(Finish) andalso + Start > 0 andalso + Finish > 0 andalso + Finish > Start -> + [list_to_tuple(X) || X <- lists:sublist(Data, Start, Finish - Start + 1)]. + +remove_last(Data) -> + lists:reverse(tl(lists:reverse(Data))). %% a valid DDL - the one used in the documents get_ddl(docs) -> diff --git a/tests/ts_A_create_table_fail_3a.erl b/tests/ts_A_create_table_fail_3a.erl new file mode 100644 index 00000000..b89ee21c --- /dev/null +++ b/tests/ts_A_create_table_fail_3a.erl @@ -0,0 +1,40 @@ +-module(ts_A_create_table_fail_3a). + +-behavior(riak_test). + +-export([ + confirm/0 + ]). + +-import(timeseries_util, [ + get_ddl/1, + confirm_create/3 + ]). + +%% +%% should error if you try and create a table twice +%% + +confirm() -> + ClusterType = single, + %% create the table twice with a different DDL + DDL1 = "CREATE TABLE GeoCheckin (" ++ + "myfamily varchar not null, " ++ + "myseries varchar not null, " ++ + "time timestamp not null, " ++ + "weather varchar not null, " ++ + "temperature float, " ++ + "PRIMARY KEY ((quantum(time, 15, 'm'), myfamily, myseries), " ++ + "time, myfamily, myseries))", + Expected1 = {ok, "GeoCheckin created\n\nWARNING: After activating GeoCheckin, " ++ + "nodes in this cluster\ncan no longer be downgraded to a version of Riak prior to 2.0\n"}, + DDL2 = "CREATE TABLE GeoCheckin (" ++ + "myfamily varchar not null, " ++ + "myseries varchar not null, " ++ + "time timestamp not null, " ++ + "weather varchar not null, " ++ + "PRIMARY KEY ((quantum(time, 15, 'm'), myfamily, myseries), " ++ + "time, myfamily, myseries))", + Expected2 = {ok,"some error message, yeah?"}, + pass = confirm_create(ClusterType, DDL1, Expected1), + confirm_create(ClusterType, DDL2, Expected2). diff --git a/tests/ts_A_put_fail_1.erl b/tests/ts_A_put_fail_1.erl index 7e433e52..03830656 100644 --- a/tests/ts_A_put_fail_1.erl +++ b/tests/ts_A_put_fail_1.erl @@ -7,18 +7,20 @@ -behavior(riak_test). -export([ - confirm/0 - ]). + confirm/0 + ]). --import(timeseries_util, [ - get_ddl/1, - get_valid_obj/0, - confirm_put/5 - ]). +-include_lib("eunit/include/eunit.hrl"). confirm() -> - ClusterType = single, - Expected = "some error message", - DDL = null, - Obj = [get_valid_obj()], - confirm_put(ClusterType, no_ddl, DDL, Obj, Expected). + [Node | _] = timeseries_util:build_cluster(single), + Bucket = list_to_binary(timeseries_util:get_bucket()), + Obj = [timeseries_util:get_valid_obj()], + io:format("2 - writing to bucket ~p with:~n- ~p~n", [Bucket, Obj]), + C = rt:pbc(Node), + ?assertMatch( + {error,_}, + riakc_ts:put(C, Bucket, Obj) + ), + pass. + diff --git a/tests/ts_A_put_fail_2.erl b/tests/ts_A_put_fail_2.erl index 1e67ffd1..77a8ca41 100644 --- a/tests/ts_A_put_fail_2.erl +++ b/tests/ts_A_put_fail_2.erl @@ -7,19 +7,16 @@ -behavior(riak_test). --export([ - confirm/0 - ]). +-export([confirm/0]). --import(timeseries_util, [ - get_ddl/1, - get_invalid_obj/0, - confirm_put/5 - ]). +-include_lib("eunit/include/eunit.hrl"). confirm() -> ClusterType = single, - DDL = get_ddl(docs), - Obj = [get_invalid_obj()], - Expected = "some error message", - confirm_put(ClusterType, normal, DDL, Obj, Expected). + DDL = timeseries_util:get_ddl(docs), + Obj = [timeseries_util:get_invalid_obj()], + ?assertMatch( + {error,_}, + timeseries_util:confirm_put(ClusterType, normal, DDL, Obj) + ), + pass. diff --git a/tests/ts_A_put_fail_3.erl b/tests/ts_A_put_fail_3.erl deleted file mode 100644 index 842bc16c..00000000 --- a/tests/ts_A_put_fail_3.erl +++ /dev/null @@ -1,23 +0,0 @@ --module(ts_A_put_fail_3). - -%% -%% this test tries to write total gibberish data to a bucket -%% - --behavior(riak_test). - --export([ - confirm/0 - ]). - --import(timeseries_util, [ - get_ddl/1, - confirm_put/5 - ]). - -confirm() -> - ClusterType = single, - DDL = get_ddl(docs), - Obj = {[some], <<"total">>, g, i, b, {e, r, i, s, h}}, - Expected = "some error message", - confirm_put(ClusterType, normal, DDL, Obj, Expected). diff --git a/tests/ts_A_put_pass_1.erl b/tests/ts_A_put_pass_1.erl index 7fb37689..3338a951 100644 --- a/tests/ts_A_put_pass_1.erl +++ b/tests/ts_A_put_pass_1.erl @@ -2,19 +2,17 @@ -behavior(riak_test). --export([ - confirm/0 - ]). +-export([confirm/0]). --import(timeseries_util, [ - get_ddl/1, - get_valid_obj/0, - confirm_put/5 - ]). +-include_lib("eunit/include/eunit.hrl"). confirm() -> Cluster = single, - DDL = get_ddl(docs), - Obj = [get_valid_obj()], - Expected = ok, - confirm_put(Cluster, normal, DDL, Obj, Expected). + TestType = normal, + DDL = timeseries_util:get_ddl(docs), + Obj = [timeseries_util:get_valid_obj()], + ?assertEqual( + ok, + timeseries_util:confirm_put(Cluster, TestType, DDL, Obj) + ), + pass. diff --git a/tests/ts_A_select_fail_2.erl b/tests/ts_A_select_fail_2.erl index da02b6c1..e05a9dd6 100644 --- a/tests/ts_A_select_fail_2.erl +++ b/tests/ts_A_select_fail_2.erl @@ -17,5 +17,5 @@ confirm() -> DDL = get_ddl(docs), Data = get_valid_select_data(), Qry = get_invalid_qry(borked_syntax), - Expected = "some error message, fix me", + Expected = {error, <<"Message decoding error: \"Unexpected token 'selectah'\"">>}, confirm_select(single, normal, DDL, Data, Qry, Expected). diff --git a/tests/ts_A_select_fail_3.erl b/tests/ts_A_select_fail_3.erl index 4fafd80b..8682731a 100644 --- a/tests/ts_A_select_fail_3.erl +++ b/tests/ts_A_select_fail_3.erl @@ -17,5 +17,5 @@ confirm() -> DDL = get_ddl(docs), Data = get_valid_select_data(), Qry = get_invalid_qry(key_not_covered), - Expected = "some error message, fix me", + Expected = {error, <<"missing_param: Missing parameter myfamily in where clause.">>}, confirm_select(single, normal, DDL, Data, Qry, Expected). diff --git a/tests/ts_A_select_fail_4.erl b/tests/ts_A_select_fail_4.erl index ed0dbf84..6004f90a 100644 --- a/tests/ts_A_select_fail_4.erl +++ b/tests/ts_A_select_fail_4.erl @@ -17,5 +17,5 @@ confirm() -> DDL = get_ddl(docs), Data = get_valid_select_data(), Qry = get_invalid_qry(invalid_operator), - Expected = "some error message, fix me", + Expected = {error, <<"invalid_query: \nincompatible_operator: field weather with type binary cannot use operator '>' in where clause.">>}, confirm_select(single, normal, DDL, Data, Qry, Expected). diff --git a/tests/ts_A_select_fail_6.erl b/tests/ts_A_select_fail_6.erl index c09787d4..52777ace 100644 --- a/tests/ts_A_select_fail_6.erl +++ b/tests/ts_A_select_fail_6.erl @@ -17,5 +17,5 @@ confirm() -> DDL = get_ddl(docs), Data = get_valid_select_data(), Qry = get_invalid_qry(type_error), - Expected = "some error message, fix me", + Expected = {error, <<"invalid_query: \nincompatible_operator: field weather with type binary cannot use operator '>' in where clause.">>}, confirm_select(single, normal, DDL, Data, Qry, Expected). diff --git a/tests/ts_A_select_pass_1.erl b/tests/ts_A_select_pass_1.erl index 2e9b200a..5846bc34 100644 --- a/tests/ts_A_select_pass_1.erl +++ b/tests/ts_A_select_pass_1.erl @@ -14,8 +14,12 @@ ]). confirm() -> + Cluster = single, + TestType = normal, DDL = get_ddl(docs), Data = get_valid_select_data(), Qry = get_valid_qry(), - Expected = ok, - confirm_select(single, normal, DDL, Data, Qry, Expected). + Expected = { + timeseries_util:get_cols(docs), + timeseries_util:exclusive_result_from_data(Data, 2, 9)}, + confirm_select(Cluster, TestType, DDL, Data, Qry, Expected). diff --git a/tests/ts_A_select_pass_2.erl b/tests/ts_A_select_pass_2.erl new file mode 100644 index 00000000..31d1c418 --- /dev/null +++ b/tests/ts_A_select_pass_2.erl @@ -0,0 +1,36 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% @doc A module to test riak_ts basic create bucket/put/select cycle, +%% spanning time quanta. + +-module(ts_A_select_pass_2). + +-behavior(riak_test). + +-export([confirm/0]). + +confirm() -> + DDL = timeseries_util:get_ddl(docs), + Data = timeseries_util:get_valid_select_data_spanning_quanta(), + Qry = timeseries_util:get_valid_qry_spanning_quanta(), + Expected = { + timeseries_util:get_cols(docs), + timeseries_util:exclusive_result_from_data(Data, 2, 9)}, + timeseries_util:confirm_select(single, normal, DDL, Data, Qry, Expected). diff --git a/tests/ts_B_put_pass_1.erl b/tests/ts_B_put_pass_1.erl index fcda6ea5..d5eef8a7 100644 --- a/tests/ts_B_put_pass_1.erl +++ b/tests/ts_B_put_pass_1.erl @@ -2,19 +2,18 @@ -behavior(riak_test). --export([ - confirm/0 - ]). +-export([confirm/0]). --import(timeseries_util, [ - get_ddl/1, - get_valid_obj/0, - confirm_put/5 - ]). +-include_lib("eunit/include/eunit.hrl"). confirm() -> Cluster = multiple, - DDL = get_ddl(docs), - Obj = [get_valid_obj()], - Expected = ok, - confirm_put(Cluster, normal, DDL, Obj, Expected). + TestType = normal, + DDL = timeseries_util:get_ddl(docs), + Obj = [timeseries_util:get_valid_obj()], + ?assertEqual( + ok, + timeseries_util:confirm_put(Cluster, TestType, DDL, Obj) + ), + pass. + diff --git a/tests/ts_B_select_pass_1.erl b/tests/ts_B_select_pass_1.erl index 2521f229..b9ba733c 100644 --- a/tests/ts_B_select_pass_1.erl +++ b/tests/ts_B_select_pass_1.erl @@ -14,8 +14,12 @@ ]). confirm() -> + Cluster = multiple, + TestType = normal, DDL = get_ddl(docs), Data = get_valid_select_data(), Qry = get_valid_qry(), - Expected = ok, - confirm_select(multiple, normal, DDL, Data, Qry, Expected). + Expected = { + timeseries_util:get_cols(docs), + timeseries_util:exclusive_result_from_data(Data, 2, 9)}, + confirm_select(Cluster, TestType, DDL, Data, Qry, Expected). diff --git a/tests/ts_basic.erl b/tests/ts_basic.erl index e4f839b7..77f6dc97 100644 --- a/tests/ts_basic.erl +++ b/tests/ts_basic.erl @@ -33,9 +33,9 @@ confirm() -> %% io:format("Data to be written: ~p\n", [make_data()]), - ClusterSize = 3, + ClusterSize = 1, lager:info("Building cluster"), - _Nodes = [Node1, _Node2, _Node3] = + _Nodes = [Node1|_] = build_cluster( ClusterSize), @@ -68,8 +68,8 @@ confirm() -> [?BUCKET, ?PKEY_P2, ?TIMEBASE + 10, ?PKEY_P2, ?TIMEBASE + 20, ?PVAL_P1])), io:format("Running query: ~p\n", [Query]), {_Columns, Rows} = riakc_ts:query(C, Query), - io:format("Got ~b rows back\n", [length(Rows)]), - ?assertEqual(length(Rows), 10), + io:format("Got ~b rows back\n~p\n", [length(Rows), Rows]), + ?assertEqual(length(Rows), 9), pass. diff --git a/tests/verify_2i_eqc.erl b/tests/verify_2i_eqc.erl new file mode 100644 index 00000000..ab413d5a --- /dev/null +++ b/tests/verify_2i_eqc.erl @@ -0,0 +1,636 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% @doc EQC test for secondary indexing using eqc_fsm to generate +%% sequences of indexing and query commands. +%% +%% The state machine is very simple. Mostly it: +%% - Indexes a bunch of keys under a single integer or binary term. +%% - Indexes a bunch of keys under an equal number of consecutive integer +%% or binary terms. +%% - Deletes a single item and all its associated index entries. +%% - Generates random queries and verifies results match the model. +%% Notice how we are only checking against the entire set of results and +%% not against each page of the results. I suggest that as an improvement. +%% +%% A couple of dummy states exist just to ensure that each run starts by +%% first creating a bunch of clients, then choosing a new unique bucket. +%% +%% The test model stores a list of keys +%% and index data for a configurable number of fields. The keys are all +%% numeric for simpler presentation and get converted to and from binary +%% as needed. For example, if two objects are created and indexed like this: +%% +%% - key 10, "i1_int" -> 1, "i1_bin" -> "a" +%% - key 20, "i1_int" -> 1, "i1_bin" -> "b" +%% +%% The model data would look like this: +%% +%% keys = [10, 20] +%% indexes = +%% [ +%% {{bin, "i1"}, [ +%% {<<"a">>, [10]}, +%% {<<"b">>, [20]} +%% ]}, +%% {{int, "i1"}, [ +%% {1, [10, 20]} +%% ]} +%% ] +%% +%% All lists in the indexes field are sorted and manipulated using orddict. +%% The indexes data structure is an orddict that maps a typed field to +%% an orddict mapping terms to lists of keys. +%% As in Riak, here "i1_int" and "i1_bin" are the fields, and the values +%% such as 1 or "a" are called terms. +%% +%% ------------------------------------------------------------------- +-module(verify_2i_eqc). +-compile(export_all). + +-ifdef(EQC). +-include_lib("riakc/include/riakc.hrl"). +-include_lib("eqc/include/eqc.hrl"). +-include_lib("eqc/include/eqc_fsm.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-behaviour(riak_test). +-export([confirm/0]). + +-define(MAX_CLUSTER_SIZE, 1). +-define(MAX_FIELDS, 1). +-define(FIELDS, ["i" ++ integer_to_list(N) || N <- lists:seq(1, ?MAX_FIELDS)]). +%% Enabling the use of http clients requires a lot more work, as things +%% do not work the same as with PB. Binaries are not encoced, empty binaries +%% have a special meaning, it's not inclusive on the end term like PB. +%% Who knows what else. :( +%%-define(CLIENT_TYPES, [pb, http]). +-define(CLIENT_TYPES, [pb]). + +-type index_field() :: {int | bin, binary()}. +-type index_value() :: binary(). +-type index_pair() :: {index_term(), [index_value()]}. +-type index_data() :: {index_field(), [index_pair()]}. + +-record(state, { + nodes = [], + clients = [], + bucket, + keys = [] :: [key()], + indexes = [] :: list(index_data()) + }). + +-record(query, { + bucket, + field, + start_term, + end_term, + page_size, + continuation + }). + +confirm() -> + %% Set up monotonic bucket name generator. + init_bucket_counter(), + Size = random:uniform(?MAX_CLUSTER_SIZE), + %% Run for 2 minutes by default. + TestingTime = rt_config:get(eqc_testing_time, 120), + lager:info("Will run in cluster of size ~p for ~p seconds.", + [Size, TestingTime]), + Nodes = rt:build_cluster(Size), + ?assert(eqc:quickcheck( + eqc:testing_time(TestingTime, ?MODULE:prop_test(Nodes)))), + pass. + +%% ==================================================================== +%% EQC Properties +%% ==================================================================== +prop_test(Nodes) -> + InitState = #state{nodes = Nodes}, + ?FORALL(Cmds, commands(?MODULE, {initial_state(), InitState}), + ?WHENFAIL( + begin + _ = lager:error("*********************** FAILED!!!!" + "*******************") + end, + ?TRAPEXIT( + begin + lager:info("========================" + " Will run commands with Nodes:~p:", [Nodes]), + [lager:info(" Command : ~p~n", [Cmd]) || Cmd <- Cmds], + {H, {_SName, S}, Res} = run_commands(?MODULE, Cmds), + lager:info("======================== Ran commands"), + %% Each run creates a new pool of clients. Clean up. + close_clients(S#state.clients), + %% Record stats on what commands were generated on + %% successful runs. This is printed after the test + %% finishes. + aggregate(zip(state_names(H),command_names(Cmds)), + equals(Res, ok)) + end))). + +%% ==================================================================== +%% Value generators and utilities. +%% ==================================================================== + +gen_node(S) -> + oneof(S#state.nodes). + +gen_client_id(S) -> + {oneof(S#state.nodes), oneof(?CLIENT_TYPES)}. + +%% Generates a key in the range 0-999. +%% TODO: How to determine an optimal range for coverage? +%% If too large, we wouldn't update the same key very often, for example. +gen_key() -> + choose(0, 999). + +%% Pick one of a fixed list of possible base field names. +gen_field() -> + oneof(?FIELDS). + +%% Produces either a binary or integer term value. +gen_term() -> + oneof([gen_int_term(), gen_bin_term()]). + +%% Generates, with equal likelihood, either a smallish or a largish integer. +gen_int_term() -> + oneof([int(), largeint()]). + +%% Generates a random binary. +gen_bin_term() -> + %% The riak HTTP interface does not like empty binaries. + %% To enable the use of the http client, which does not encode + %% binaries at freaking all, you would need something like this: + %% iolist_to_binary(http_uri:encode(binary_to_list(B))). + %% You also need to prevent empty binaries, which through http + %% mean "no term" + %%?LET(B, non_empty(binary()), sanitize_binary(B)). + binary(). + +sanitize_binary(B) -> + B2 = base64:encode(B), + sanitize_binary(B2, <<>>). + +sanitize_binary(<<>>, B) -> + B; +sanitize_binary(<<"=", Rest/binary>>, Out) -> + sanitize_binary(Rest, <>); +sanitize_binary(<>, Out) -> + sanitize_binary(Rest, <>). + +%% Generates a list of integer keys without duplicates. +gen_key_list() -> + ?LET(L, non_empty(list(gen_key())), lists:usort(L)). + +%% Generates non-empty lists of {Key, Field, Term} triplets. +gen_key_field_terms() -> + non_empty(list({gen_key(), gen_field(), gen_term()})). + +%% Produces, with equal likelihood, either no page size, a smallish one or +%% a largish one. +gen_page_size() -> + oneof([undefined, gen_small_page_size(), gen_large_page_size()]). + +%% Based on EQC's nat() so numbers tend to be smallish. +%% Adjusting with LET to avoid zero, which is invalid. +gen_small_page_size() -> + ?LET(N, nat(), N + 1). + +%% Adjusts largeint() to make the result strictly positive. +gen_large_page_size() -> + choose(1, 16#ffffFFFF). + +%% Chooses one of the keys in the model at random. +gen_existing_key(#state{keys = Keys}) -> + oneof(Keys). + +%% Generates either a query on an integer or binary field that: +%% - Uses a couple of existing terms as start/ends +%% - Includes all terms in the index +%% - Generates start/end terms randomly, which may not span any existing items. +gen_range_query(S) -> + oneof([gen_some_query(S), gen_all_query(S), gen_random_query(S)]). + +gen_random_query(#state{bucket = Bucket}) -> + oneof([gen_int_query(Bucket), gen_bin_query(Bucket)]). + +%% Query that includes all terms for a given field. +gen_all_query(#state{bucket = Bucket, indexes = Idx}) -> + ?LET({{{_Type, Field}, Terms}, PageSize}, + {oneof(Idx), gen_page_size()}, + new_query(Bucket, Field, first_term(Terms), last_term(Terms), + PageSize)). + +%% Chooses two existing terms as start and end. +gen_some_query(#state{bucket = Bucket, indexes = Idx}) -> + ?LET({{{_Type, Field}, Terms}, PageSize}, + {oneof(Idx), gen_page_size()}, + ?LET({{Term1, _}, {Term2, _}}, {oneof(Terms), oneof(Terms)}, + new_query(Bucket, Field, Term1, Term2, PageSize))). + +gen_int_query(Bucket) -> + ?LET({Field, Term1, Term2, PageSize}, + {gen_field(), gen_int_term(), gen_int_term(), gen_page_size()}, + new_query(Bucket, Field, Term1, Term2, PageSize)). + +gen_bin_query(Bucket) -> + ?LET({Field, Term1, Term2, PageSize}, + {gen_field(), gen_bin_term(), gen_bin_term(), gen_page_size()}, + new_query(Bucket, Field, Term1, Term2, PageSize)). + +%% Populates a new query record. For convenience, corrects the order of the +%% start and end terms so that start is always less than or equal to end. +%% That way we don't need any generator tricks for those. +new_query(Bucket, Field, Term1, Term2, PageSize) when Term1 > Term2 -> + #query{bucket = Bucket, field = Field, + start_term = Term2, end_term = Term1, + page_size = PageSize}; +new_query(Bucket, Field, Term1, Term2, PageSize) -> + #query{bucket = Bucket, field = Field, + start_term = Term1, end_term = Term2, + page_size = PageSize}. + +%% First term in a term to keys orddict. +first_term(TermKeys) -> + {Term, _} = hd(TermKeys), + Term. + +%% Last term in a term to keys orddict. +last_term(TermKeys) -> + {Term, _} = lists:last(TermKeys), + Term. + +%% ====================================================== +%% States spec +%% ====================================================== +initial_state() -> + pre_setup_state1. + +initial_state_data() -> + #state{}. + +pre_setup_state1(S) -> + #state{nodes = Nodes} = S, + [{pre_setup_state2, {call, ?MODULE, tx_create_clients, [Nodes]}}]. + +pre_setup_state2(_S) -> + [{default_state, {call, ?MODULE, tx_next_bucket, []}}]. + +default_state(S) -> + #state{clients = Clients, bucket = Bucket} = S, + [ + {default_state, {call, ?MODULE, tx_index_single_term, + [Clients, gen_client_id(S), Bucket, + gen_key_list(), gen_field(), gen_term()]}}, + {default_state, {call, ?MODULE, tx_index_multi_term, + [Clients, gen_client_id(S), Bucket, + gen_key_field_terms()]}}, + {default_state, {call, ?MODULE, tx_delete_one, + [Clients, gen_client_id(S), Bucket, + gen_existing_key(S)]}}, + {default_state, {call, ?MODULE, tx_query_range, + [Clients, gen_client_id(S), gen_range_query(S)]}} + ]. + +%% Tweak transition weights such that deletes are rare. +%% Indexing a bunch or querying a bunch of items are equally likely. +weight(default_state, default_state, {call, _, tx_delete_one, _}) -> + 1; +weight(_, _, _) -> + 100. + +%% State data mutations for each transition. +next_state_data(_, _, S, Clients, {call, _, tx_create_clients, [_]}) -> + S#state{clients = Clients}; +next_state_data(_, _, S, Bucket, {call, _, tx_next_bucket, []}) -> + S#state{bucket = Bucket}; +next_state_data(default_state, default_state, S, _, + {call, _, tx_index_single_term, + [_, _, _, NewKeys, Field, Term]}) -> + #state{keys = Keys0, indexes = Idx0} = S, + Keys1 = lists:umerge(NewKeys, Keys0), + Idx1 = model_index(NewKeys, Field, Term, Idx0), + S#state{keys = Keys1, indexes = Idx1}; +next_state_data(default_state, default_state, S, _, + {call, _, tx_index_multi_term, + [_, _, _, KeyFieldTerms]}) -> + #state{keys = Keys0, indexes = Idx0} = S, + %% Add to list of keys and dedupe. + NewKeys = [K || {K, _, _} <- KeyFieldTerms], + Keys1 = lists:umerge(NewKeys, Keys0), + Idx1 = model_index(KeyFieldTerms, Idx0), + S#state{keys = Keys1, indexes = Idx1}; +next_state_data(default_state, default_state, S, _, + {call, _, tx_delete_one, [_, _, _, Key]}) -> + #state{keys = Keys0, indexes = Idx0} = S, + Keys1 = lists:delete(Key, Keys0), + Idx1 = model_delete_key(Key, Idx0), + S#state{keys = Keys1, indexes = Idx1}; +next_state_data(_, _, S, _, _) -> + %% Any other transition leaves state unchanged. + S. + +%% No precondition checks. Among other things, that means that shrinking may +%% end up issuing deletes to keys that do not exist, which is harmless. +%% Any indexing, deleting or querying command can be issued at any point +%% in the sequence. +precondition(_From, _To, _S, {call, _, _, _}) -> + true. + +%% Signal a test failure if there is an explicit error from the query or +%% if the results do not match what is in the model. +postcondition(_, _, S, {call, _, tx_query_range, [_, _, Query]}, {error, Err}) -> + {state, S, query, Query, error, Err}; +postcondition(_, _, S, {call, _, tx_query_range, [_, Client, Query]}, Keys) -> + #state{indexes = Idx} = S, + ExpectedKeys = model_query_range(Query, Idx), + case lists:usort(Keys) =:= ExpectedKeys of + true -> true; + false -> {state, S, client, Client, query, Query, + expected, ExpectedKeys, actual, Keys} + end; +postcondition(_, _, _, _Call, _) -> + true. + +%% ====================================================== +%% State transition functions. +%% ====================================================== + +%% Returns a dictionary that stores a client object per each node +%% and client type. +%% {Node, Type} -> {Type, Client} +tx_create_clients(Nodes) -> + orddict:from_list([{{N, T}, {T, create_client(N, T)}} + || N <- Nodes, T <- ?CLIENT_TYPES]). + +%% Returns a different bucket name each time it's called. +tx_next_bucket() -> + N = ets:update_counter(bucket_table, bucket_number, 1), + NBin = integer_to_binary(N), + <<"bucket", NBin/binary>>. + +%% Index a bunch of keys under the same field/term. +tx_index_single_term(Clients, ClientId, Bucket, Keys, Field, Term) -> + Client = get_client(ClientId, Clients), + lager:info("Indexing in ~p under (~p, ~p) using client ~p: ~w", + [Bucket, Field, Term, ClientId, Keys]), + [index_object(Client, Bucket, Key, Field, Term) || Key <- Keys], + ok. + +%% Index a number of keys each under a different term. +tx_index_multi_term(Clients, ClientId, Bucket, KeyFieldTerms) -> + Client = get_client(ClientId, Clients), + lager:info("Indexing in ~p with client ~p: ~p", + [Bucket, ClientId, KeyFieldTerms]), + [index_object(Client, Bucket, Key, Field, Term) + || {Key, Field, Term} <- KeyFieldTerms], + ok. + +%% Delete a single object and all its associated index entries. +tx_delete_one(Clients, ClientId, Bucket, IntKey) -> + Client = get_client(ClientId, Clients), + lager:info("Deleting key ~p from bucket ~p using ~p", + [IntKey, Bucket, ClientId]), + delete_key(Client, Bucket, IntKey), + ok. + +tx_query_range(Clients, ClientId, Query) -> + Client = get_client(ClientId, Clients), + Keys = lists:sort(query_range(Client, Query, [])), + lager:info("Query ~p, ~p from ~p to ~p, page = ~p, using ~p " + "returned ~p keys.", + [Query#query.bucket, Query#query.field, Query#query.start_term, + Query#query.end_term, Query#query.page_size, ClientId, + length(Keys)]), + %% Re-run with page sizes 1 -> 100, verify it's always the same result. + PageChecks = + [begin + Q2 = Query#query{page_size = PSize}, + OKeys = lists:sort(query_range(Client, Q2, [])), + OKeys =:= Keys + end || PSize <- lists:seq(1, 100)], + case lists:all(fun is_true/1, PageChecks) of + true -> + Keys; + false -> + {error, mismatch_when_paged} + end. + +%% ====================================================== +%% Client utilities. +%% ====================================================== + +create_client(Node, pb) -> + rt:pbc(Node); +create_client(Node, http) -> + rt:httpc(Node). + +get_client(ClientId, Clients) -> + orddict:fetch(ClientId, Clients). + +%% Convert field/term pair to pb client argument format. +to_field_id_term(Field, Term) -> + {to_field_id(Field, Term), [Term]}. + +to_field_id(Field, Term) when is_integer(Term) -> + {integer_index, Field}; +to_field_id(Field, Term) when is_binary(Term) -> + {binary_index, Field}. + +to_field_id_term_http(Field, Term) -> + {to_field_id_http(Field, Term), [Term]}. + +to_field_id_http(Field, Term) when is_integer(Term) -> + iolist_to_binary([Field, "_int"]); +to_field_id_http(Field, Term) when is_binary(Term) -> + iolist_to_binary([Field, "_bin"]). + +index_object({pb, C}, Bucket, Key0, Field, Term) -> + Key = to_bin_key(Key0), + FT = to_field_id_term(Field, Term), + Obj0 = case riakc_pb_socket:get(C, Bucket, Key) of + {ok, O} -> + O; + {error, notfound} -> + riakc_obj:new(Bucket, Key, Key) + end, + MD0 = riakc_obj:get_update_metadata(Obj0), + MD1 = riakc_obj:add_secondary_index(MD0, [FT]), + Obj1 = riakc_obj:update_metadata(Obj0, MD1), + ok = riakc_pb_socket:put(C, Obj1, [{dw, 3}]), + ok; +index_object({http, C}, Bucket, Key0, Field, Term) -> + Key = to_bin_key(Key0), + FT = to_field_id_term_http(Field, Term), + Obj0 = case rhc:get(C, Bucket, Key) of + {ok, O} -> + O; + {error, notfound} -> + riakc_obj:new(Bucket, Key, Key) + end, + MD0 = riakc_obj:get_update_metadata(Obj0), + MD1 = riakc_obj:add_secondary_index(MD0, [FT]), + Obj1 = riakc_obj:update_metadata(Obj0, MD1), + ok = rhc:put(C, Obj1, [{dw, 3}]), + ok. + +delete_key({pb, PB}, Bucket, IntKey) -> + Key = to_bin_key(IntKey), + case riakc_pb_socket:get(PB, Bucket, Key) of + {ok, O} -> + ok = riakc_pb_socket:delete_obj(PB, O, [{dw, 3}]); + {error, notfound} -> + ok = riakc_pb_socket:delete(PB, Bucket, Key, [{dw, 3}]) + end, + %% Wait until all tombstones have been reaped. + %% TODO: Do we need to reap tombstones for this test? I think now. + %% ok = rt:wait_until(fun() -> rt:pbc_really_deleted(PB, Bucket, [Key]) end); + ok; +delete_key({http, C}, Bucket, IntKey) -> + Key = to_bin_key(IntKey), + case rhc:get(C, Bucket, Key) of + {ok, O} -> + ok = rhc:delete_obj(C, O, [{dw, 3}]); + {error, notfound} -> + ok = rhc:delete(C, Bucket, Key, [{dw, 3}]) + end, + %% Wait until all tombstones have been reaped. + %%ok = rt:wait_until(fun() -> rt:httpc_really_deleted(C, Bucket, [Key]) end). + ok. + +%% Execute range query using a client, fetching multiple pages if necessary. +query_range({pb, PB} = Client, + #query { bucket = Bucket, field = FieldName, + start_term = Start, end_term = End, + page_size = PageSize, continuation = Cont } = Query, + AccKeys) -> + Field = to_field_id(FieldName, Start), + case riakc_pb_socket:get_index_range(PB, Bucket, Field, Start, End, + [{max_results, PageSize}, + {continuation, Cont}]) of + {ok, ?INDEX_RESULTS{keys = Keys, continuation = undefined}} -> + AccKeys ++ Keys; + {ok, ?INDEX_RESULTS{keys = Keys, continuation = Cont1}} -> + Query1 = Query#query{continuation = Cont1}, + query_range(Client, Query1, AccKeys ++ Keys) + end; +query_range({http, C} = Client, + #query { bucket = Bucket, field = FieldName, + start_term = Start, end_term = End, + page_size = PageSize, continuation = Cont } = Query, + AccKeys) -> + Field = to_field_id(FieldName, Start), + case rhc:get_index(C, Bucket, Field, {Start, End}, + [{max_results, PageSize}, + {continuation, Cont}]) of + {ok, ?INDEX_RESULTS{keys = Keys, continuation = undefined}} -> + AccKeys ++ Keys; + {ok, ?INDEX_RESULTS{keys = Keys, continuation = Cont1}} -> + Query1 = Query#query{continuation = Cont1}, + query_range(Client, Query1, AccKeys ++ Keys) + end. + +%% Close all clients, ignore errors. +close_clients(Clients) -> + [catch riakc_pb_socket:stop(Client) || {pb, Client} <- Clients], + ok. + +%% ====================================================== +%% Model data utilities +%% ====================================================== + +model_index([], Idx) -> + Idx; +model_index([{Keys, Field, Term} | More], Idx) -> + Idx1 = model_index(Keys, Field, Term, Idx), + model_index(More, Idx1). + +model_index(NewKeys0, Field, Term, Idx) when is_list(NewKeys0) -> + TField = to_tfield(Field, Term), + NewKeys = lists:usort(NewKeys0), + TermKeys1 = + case orddict:find(TField, Idx) of + {ok, TermKeys0} -> + case orddict:find(Term, TermKeys0) of + {ok, Keys0} -> + MergedKeys = lists:umerge(NewKeys, Keys0), + orddict:store(Term, MergedKeys, TermKeys0); + _ -> + orddict:store(Term, NewKeys, TermKeys0) + end; + _ -> + [{Term, NewKeys}] + end, + orddict:store(TField, TermKeys1, Idx); +model_index(NewKey, Field, Term, Idx) -> + model_index([NewKey], Field, Term, Idx). + +model_delete_key(Key, Idx) -> + [{Field, delete_key_from_term_keys(Key, TermKeys)} + || {Field, TermKeys} <- Idx]. + +delete_key_from_term_keys(Key, TermKeys) -> + [{Term, lists:delete(Key, Keys)} || {Term, Keys} <- TermKeys]. + +%% Produces a typed field id. For example, "i1"/43 -> {int, "i1"} +to_tfield(FieldName, Term) -> + case is_integer(Term) of + true -> {int, FieldName}; + false -> {bin, FieldName} + end. + +%% Query against the modeled data. +model_query_range(Query, Idx) -> + #query{ field = Field, start_term = Start, end_term = End } = Query, + TField = to_tfield(Field, Start), + %% Collect all keys with terms within the given range, ignore others. + Scanner = fun({Term, Keys}, Acc) when Term >= Start, End >= Term -> + [Keys | Acc]; + ({_Term, _Keys}, Acc) -> + Acc + end, + case orddict:find(TField, Idx) of + error -> + []; + {ok, FieldIdx} -> + KeyGroups = lists:foldl(Scanner, [], FieldIdx), + IntKeys = lists:umerge(KeyGroups), + [to_bin_key(Key) || Key <- IntKeys] + end. + +%% ====================================================== +%% Internal +%% ====================================================== + +is_true(true) -> true; +is_true(_) -> false. + +%% Initialize counter used to use a different bucket per run. +init_bucket_counter() -> + ets:new(bucket_table, [named_table, public]), + ets:insert_new(bucket_table, [{bucket_number, 0}]). + +%% Convert integer object key to binary form. +to_bin_key(N) when is_integer(N) -> + iolist_to_binary(io_lib:format("~5..0b", [N])); +to_bin_key(Key) when is_binary(Key) -> + Key. + +-endif.