Merge branch 'riak_ts' of github.com:basho/riak_test into feature/jrd/ts2i

This commit is contained in:
John R. Daily 2015-10-12 15:21:12 -04:00
commit a65520d2f8
16 changed files with 820 additions and 95 deletions

View File

@ -50,7 +50,7 @@ confirm_activate(ClusterType, DDL, Expected) ->
pass.
confirm_put(ClusterType, TestType, DDL, Obj, Expected) ->
confirm_put(ClusterType, TestType, DDL, Obj) ->
[Node | _] = build_cluster(ClusterType),
@ -66,10 +66,7 @@ confirm_put(ClusterType, TestType, DDL, Obj, Expected) ->
Bucket = list_to_binary(get_bucket()),
io:format("2 - writing to bucket ~p with:~n- ~p~n", [Bucket, Obj]),
C = rt:pbc(Node),
Get = riakc_ts:put(C, Bucket, Obj),
?assertEqual(Expected, Get),
pass.
riakc_ts:put(C, Bucket, Obj).
confirm_select(ClusterType, TestType, DDL, Data, Qry, Expected) ->
@ -133,24 +130,59 @@ get_bucket() ->
"GeoCheckin".
get_valid_qry() ->
"select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries'".
"select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX'".
get_invalid_qry(borked_syntax) ->
"selectah * from GeoCheckin Where time > 1 and time < 10";
get_invalid_qry(key_not_covered) ->
"select * from GeoCheckin Where time > 1 and time < 10";
get_invalid_qry(invalid_operator) ->
"select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries' and weather > 'bob'";
"select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX' and weather > 'bob'";
get_invalid_qry(field_comparison) ->
"select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries' and weather = myfamily";
"select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX' and weather = family1";
get_invalid_qry(type_error) ->
"select * from GeoCheckin Where time > 1 and time < 10 and family = 'myfamily' and series ='myseries' and weather = true".
"select * from GeoCheckin Where time > 1 and time < 10 and myfamily = 'family1' and myseries ='seriesX' and weather > true".
get_valid_select_data() ->
Family = <<"myfamily">>,
Series = <<"myseries">>,
Family = <<"family1">>,
Series = <<"seriesX">>,
Times = lists:seq(1, 10),
[[Family, Series, X, get_varchar(), get_float()] || X <- Times].
[[Family, Series, X, get_varchar(), get_float()] || X <- Times].
-define(SPANNING_STEP, (1000*60*5)).
get_valid_qry_spanning_quanta() ->
StartTime = 1 + ?SPANNING_STEP * 1,
EndTime = 1 + ?SPANNING_STEP * 10,
lists:flatten(
io_lib:format("select * from GeoCheckin Where time > ~b and time < ~b"
" and myfamily = 'family1' and myseries = 'seriesX'",
[StartTime, EndTime])).
get_valid_select_data_spanning_quanta() ->
Family = <<"family1">>,
Series = <<"seriesX">>,
Times = lists:seq(1 + ?SPANNING_STEP, 1 + ?SPANNING_STEP * 10, ?SPANNING_STEP), %% five-minute intervals, to span 15-min buckets
[[Family, Series, X, get_varchar(), get_float()] || X <- Times].
get_cols(docs) ->
[<<"myfamily">>,
<<"myseries">>,
<<"time">>,
<<"weather">>,
<<"temperature">>].
exclusive_result_from_data(Data, Start, Finish) when is_integer(Start) andalso
is_integer(Finish) andalso
Start > 0 andalso
Finish > 0 andalso
Finish > Start ->
[list_to_tuple(X) || X <- lists:sublist(Data, Start, Finish - Start + 1)].
remove_last(Data) ->
lists:reverse(tl(lists:reverse(Data))).
%% a valid DDL - the one used in the documents
get_ddl(docs) ->

View File

@ -0,0 +1,40 @@
-module(ts_A_create_table_fail_3a).
-behavior(riak_test).
-export([
confirm/0
]).
-import(timeseries_util, [
get_ddl/1,
confirm_create/3
]).
%%
%% should error if you try and create a table twice
%%
confirm() ->
ClusterType = single,
%% create the table twice with a different DDL
DDL1 = "CREATE TABLE GeoCheckin (" ++
"myfamily varchar not null, " ++
"myseries varchar not null, " ++
"time timestamp not null, " ++
"weather varchar not null, " ++
"temperature float, " ++
"PRIMARY KEY ((quantum(time, 15, 'm'), myfamily, myseries), " ++
"time, myfamily, myseries))",
Expected1 = {ok, "GeoCheckin created\n\nWARNING: After activating GeoCheckin, " ++
"nodes in this cluster\ncan no longer be downgraded to a version of Riak prior to 2.0\n"},
DDL2 = "CREATE TABLE GeoCheckin (" ++
"myfamily varchar not null, " ++
"myseries varchar not null, " ++
"time timestamp not null, " ++
"weather varchar not null, " ++
"PRIMARY KEY ((quantum(time, 15, 'm'), myfamily, myseries), " ++
"time, myfamily, myseries))",
Expected2 = {ok,"some error message, yeah?"},
pass = confirm_create(ClusterType, DDL1, Expected1),
confirm_create(ClusterType, DDL2, Expected2).

View File

@ -7,18 +7,20 @@
-behavior(riak_test).
-export([
confirm/0
]).
confirm/0
]).
-import(timeseries_util, [
get_ddl/1,
get_valid_obj/0,
confirm_put/5
]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
ClusterType = single,
Expected = "some error message",
DDL = null,
Obj = [get_valid_obj()],
confirm_put(ClusterType, no_ddl, DDL, Obj, Expected).
[Node | _] = timeseries_util:build_cluster(single),
Bucket = list_to_binary(timeseries_util:get_bucket()),
Obj = [timeseries_util:get_valid_obj()],
io:format("2 - writing to bucket ~p with:~n- ~p~n", [Bucket, Obj]),
C = rt:pbc(Node),
?assertMatch(
{error,_},
riakc_ts:put(C, Bucket, Obj)
),
pass.

View File

@ -7,19 +7,16 @@
-behavior(riak_test).
-export([
confirm/0
]).
-export([confirm/0]).
-import(timeseries_util, [
get_ddl/1,
get_invalid_obj/0,
confirm_put/5
]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
ClusterType = single,
DDL = get_ddl(docs),
Obj = [get_invalid_obj()],
Expected = "some error message",
confirm_put(ClusterType, normal, DDL, Obj, Expected).
DDL = timeseries_util:get_ddl(docs),
Obj = [timeseries_util:get_invalid_obj()],
?assertMatch(
{error,_},
timeseries_util:confirm_put(ClusterType, normal, DDL, Obj)
),
pass.

View File

@ -1,23 +0,0 @@
-module(ts_A_put_fail_3).
%%
%% this test tries to write total gibberish data to a bucket
%%
-behavior(riak_test).
-export([
confirm/0
]).
-import(timeseries_util, [
get_ddl/1,
confirm_put/5
]).
confirm() ->
ClusterType = single,
DDL = get_ddl(docs),
Obj = {[some], <<"total">>, g, i, b, {e, r, i, s, h}},
Expected = "some error message",
confirm_put(ClusterType, normal, DDL, Obj, Expected).

View File

@ -2,19 +2,17 @@
-behavior(riak_test).
-export([
confirm/0
]).
-export([confirm/0]).
-import(timeseries_util, [
get_ddl/1,
get_valid_obj/0,
confirm_put/5
]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
Cluster = single,
DDL = get_ddl(docs),
Obj = [get_valid_obj()],
Expected = ok,
confirm_put(Cluster, normal, DDL, Obj, Expected).
TestType = normal,
DDL = timeseries_util:get_ddl(docs),
Obj = [timeseries_util:get_valid_obj()],
?assertEqual(
ok,
timeseries_util:confirm_put(Cluster, TestType, DDL, Obj)
),
pass.

View File

@ -17,5 +17,5 @@ confirm() ->
DDL = get_ddl(docs),
Data = get_valid_select_data(),
Qry = get_invalid_qry(borked_syntax),
Expected = "some error message, fix me",
Expected = {error, <<"Message decoding error: \"Unexpected token 'selectah'\"">>},
confirm_select(single, normal, DDL, Data, Qry, Expected).

View File

@ -17,5 +17,5 @@ confirm() ->
DDL = get_ddl(docs),
Data = get_valid_select_data(),
Qry = get_invalid_qry(key_not_covered),
Expected = "some error message, fix me",
Expected = {error, <<"missing_param: Missing parameter myfamily in where clause.">>},
confirm_select(single, normal, DDL, Data, Qry, Expected).

View File

@ -17,5 +17,5 @@ confirm() ->
DDL = get_ddl(docs),
Data = get_valid_select_data(),
Qry = get_invalid_qry(invalid_operator),
Expected = "some error message, fix me",
Expected = {error, <<"invalid_query: \nincompatible_operator: field weather with type binary cannot use operator '>' in where clause.">>},
confirm_select(single, normal, DDL, Data, Qry, Expected).

View File

@ -17,5 +17,5 @@ confirm() ->
DDL = get_ddl(docs),
Data = get_valid_select_data(),
Qry = get_invalid_qry(type_error),
Expected = "some error message, fix me",
Expected = {error, <<"invalid_query: \nincompatible_operator: field weather with type binary cannot use operator '>' in where clause.">>},
confirm_select(single, normal, DDL, Data, Qry, Expected).

View File

@ -14,8 +14,12 @@
]).
confirm() ->
Cluster = single,
TestType = normal,
DDL = get_ddl(docs),
Data = get_valid_select_data(),
Qry = get_valid_qry(),
Expected = ok,
confirm_select(single, normal, DDL, Data, Qry, Expected).
Expected = {
timeseries_util:get_cols(docs),
timeseries_util:exclusive_result_from_data(Data, 2, 9)},
confirm_select(Cluster, TestType, DDL, Data, Qry, Expected).

View File

@ -0,0 +1,36 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc A module to test riak_ts basic create bucket/put/select cycle,
%% spanning time quanta.
-module(ts_A_select_pass_2).
-behavior(riak_test).
-export([confirm/0]).
confirm() ->
DDL = timeseries_util:get_ddl(docs),
Data = timeseries_util:get_valid_select_data_spanning_quanta(),
Qry = timeseries_util:get_valid_qry_spanning_quanta(),
Expected = {
timeseries_util:get_cols(docs),
timeseries_util:exclusive_result_from_data(Data, 2, 9)},
timeseries_util:confirm_select(single, normal, DDL, Data, Qry, Expected).

View File

@ -2,19 +2,18 @@
-behavior(riak_test).
-export([
confirm/0
]).
-export([confirm/0]).
-import(timeseries_util, [
get_ddl/1,
get_valid_obj/0,
confirm_put/5
]).
-include_lib("eunit/include/eunit.hrl").
confirm() ->
Cluster = multiple,
DDL = get_ddl(docs),
Obj = [get_valid_obj()],
Expected = ok,
confirm_put(Cluster, normal, DDL, Obj, Expected).
TestType = normal,
DDL = timeseries_util:get_ddl(docs),
Obj = [timeseries_util:get_valid_obj()],
?assertEqual(
ok,
timeseries_util:confirm_put(Cluster, TestType, DDL, Obj)
),
pass.

View File

@ -14,8 +14,12 @@
]).
confirm() ->
Cluster = multiple,
TestType = normal,
DDL = get_ddl(docs),
Data = get_valid_select_data(),
Qry = get_valid_qry(),
Expected = ok,
confirm_select(multiple, normal, DDL, Data, Qry, Expected).
Expected = {
timeseries_util:get_cols(docs),
timeseries_util:exclusive_result_from_data(Data, 2, 9)},
confirm_select(Cluster, TestType, DDL, Data, Qry, Expected).

View File

@ -33,9 +33,9 @@
confirm() ->
%% io:format("Data to be written: ~p\n", [make_data()]),
ClusterSize = 3,
ClusterSize = 1,
lager:info("Building cluster"),
_Nodes = [Node1, _Node2, _Node3] =
_Nodes = [Node1|_] =
build_cluster(
ClusterSize),
@ -68,8 +68,8 @@ confirm() ->
[?BUCKET, ?PKEY_P2, ?TIMEBASE + 10, ?PKEY_P2, ?TIMEBASE + 20, ?PVAL_P1])),
io:format("Running query: ~p\n", [Query]),
{_Columns, Rows} = riakc_ts:query(C, Query),
io:format("Got ~b rows back\n", [length(Rows)]),
?assertEqual(length(Rows), 10),
io:format("Got ~b rows back\n~p\n", [length(Rows), Rows]),
?assertEqual(length(Rows), 9),
pass.

636
tests/verify_2i_eqc.erl Normal file
View File

@ -0,0 +1,636 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc EQC test for secondary indexing using eqc_fsm to generate
%% sequences of indexing and query commands.
%%
%% The state machine is very simple. Mostly it:
%% - Indexes a bunch of keys under a single integer or binary term.
%% - Indexes a bunch of keys under an equal number of consecutive integer
%% or binary terms.
%% - Deletes a single item and all its associated index entries.
%% - Generates random queries and verifies results match the model.
%% Notice how we are only checking against the entire set of results and
%% not against each page of the results. I suggest that as an improvement.
%%
%% A couple of dummy states exist just to ensure that each run starts by
%% first creating a bunch of clients, then choosing a new unique bucket.
%%
%% The test model stores a list of keys
%% and index data for a configurable number of fields. The keys are all
%% numeric for simpler presentation and get converted to and from binary
%% as needed. For example, if two objects are created and indexed like this:
%%
%% - key 10, "i1_int" -> 1, "i1_bin" -> "a"
%% - key 20, "i1_int" -> 1, "i1_bin" -> "b"
%%
%% The model data would look like this:
%%
%% keys = [10, 20]
%% indexes =
%% [
%% {{bin, "i1"}, [
%% {<<"a">>, [10]},
%% {<<"b">>, [20]}
%% ]},
%% {{int, "i1"}, [
%% {1, [10, 20]}
%% ]}
%% ]
%%
%% All lists in the indexes field are sorted and manipulated using orddict.
%% The indexes data structure is an orddict that maps a typed field to
%% an orddict mapping terms to lists of keys.
%% As in Riak, here "i1_int" and "i1_bin" are the fields, and the values
%% such as 1 or "a" are called terms.
%%
%% -------------------------------------------------------------------
-module(verify_2i_eqc).
-compile(export_all).
-ifdef(EQC).
-include_lib("riakc/include/riakc.hrl").
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_fsm.hrl").
-include_lib("eunit/include/eunit.hrl").
-behaviour(riak_test).
-export([confirm/0]).
-define(MAX_CLUSTER_SIZE, 1).
-define(MAX_FIELDS, 1).
-define(FIELDS, ["i" ++ integer_to_list(N) || N <- lists:seq(1, ?MAX_FIELDS)]).
%% Enabling the use of http clients requires a lot more work, as things
%% do not work the same as with PB. Binaries are not encoced, empty binaries
%% have a special meaning, it's not inclusive on the end term like PB.
%% Who knows what else. :(
%%-define(CLIENT_TYPES, [pb, http]).
-define(CLIENT_TYPES, [pb]).
-type index_field() :: {int | bin, binary()}.
-type index_value() :: binary().
-type index_pair() :: {index_term(), [index_value()]}.
-type index_data() :: {index_field(), [index_pair()]}.
-record(state, {
nodes = [],
clients = [],
bucket,
keys = [] :: [key()],
indexes = [] :: list(index_data())
}).
-record(query, {
bucket,
field,
start_term,
end_term,
page_size,
continuation
}).
confirm() ->
%% Set up monotonic bucket name generator.
init_bucket_counter(),
Size = random:uniform(?MAX_CLUSTER_SIZE),
%% Run for 2 minutes by default.
TestingTime = rt_config:get(eqc_testing_time, 120),
lager:info("Will run in cluster of size ~p for ~p seconds.",
[Size, TestingTime]),
Nodes = rt:build_cluster(Size),
?assert(eqc:quickcheck(
eqc:testing_time(TestingTime, ?MODULE:prop_test(Nodes)))),
pass.
%% ====================================================================
%% EQC Properties
%% ====================================================================
prop_test(Nodes) ->
InitState = #state{nodes = Nodes},
?FORALL(Cmds, commands(?MODULE, {initial_state(), InitState}),
?WHENFAIL(
begin
_ = lager:error("*********************** FAILED!!!!"
"*******************")
end,
?TRAPEXIT(
begin
lager:info("========================"
" Will run commands with Nodes:~p:", [Nodes]),
[lager:info(" Command : ~p~n", [Cmd]) || Cmd <- Cmds],
{H, {_SName, S}, Res} = run_commands(?MODULE, Cmds),
lager:info("======================== Ran commands"),
%% Each run creates a new pool of clients. Clean up.
close_clients(S#state.clients),
%% Record stats on what commands were generated on
%% successful runs. This is printed after the test
%% finishes.
aggregate(zip(state_names(H),command_names(Cmds)),
equals(Res, ok))
end))).
%% ====================================================================
%% Value generators and utilities.
%% ====================================================================
gen_node(S) ->
oneof(S#state.nodes).
gen_client_id(S) ->
{oneof(S#state.nodes), oneof(?CLIENT_TYPES)}.
%% Generates a key in the range 0-999.
%% TODO: How to determine an optimal range for coverage?
%% If too large, we wouldn't update the same key very often, for example.
gen_key() ->
choose(0, 999).
%% Pick one of a fixed list of possible base field names.
gen_field() ->
oneof(?FIELDS).
%% Produces either a binary or integer term value.
gen_term() ->
oneof([gen_int_term(), gen_bin_term()]).
%% Generates, with equal likelihood, either a smallish or a largish integer.
gen_int_term() ->
oneof([int(), largeint()]).
%% Generates a random binary.
gen_bin_term() ->
%% The riak HTTP interface does not like empty binaries.
%% To enable the use of the http client, which does not encode
%% binaries at freaking all, you would need something like this:
%% iolist_to_binary(http_uri:encode(binary_to_list(B))).
%% You also need to prevent empty binaries, which through http
%% mean "no term"
%%?LET(B, non_empty(binary()), sanitize_binary(B)).
binary().
sanitize_binary(B) ->
B2 = base64:encode(B),
sanitize_binary(B2, <<>>).
sanitize_binary(<<>>, B) ->
B;
sanitize_binary(<<"=", Rest/binary>>, Out) ->
sanitize_binary(Rest, <<Out/binary, "-">>);
sanitize_binary(<<C, Rest/binary>>, Out) ->
sanitize_binary(Rest, <<Out/binary, C>>).
%% Generates a list of integer keys without duplicates.
gen_key_list() ->
?LET(L, non_empty(list(gen_key())), lists:usort(L)).
%% Generates non-empty lists of {Key, Field, Term} triplets.
gen_key_field_terms() ->
non_empty(list({gen_key(), gen_field(), gen_term()})).
%% Produces, with equal likelihood, either no page size, a smallish one or
%% a largish one.
gen_page_size() ->
oneof([undefined, gen_small_page_size(), gen_large_page_size()]).
%% Based on EQC's nat() so numbers tend to be smallish.
%% Adjusting with LET to avoid zero, which is invalid.
gen_small_page_size() ->
?LET(N, nat(), N + 1).
%% Adjusts largeint() to make the result strictly positive.
gen_large_page_size() ->
choose(1, 16#ffffFFFF).
%% Chooses one of the keys in the model at random.
gen_existing_key(#state{keys = Keys}) ->
oneof(Keys).
%% Generates either a query on an integer or binary field that:
%% - Uses a couple of existing terms as start/ends
%% - Includes all terms in the index
%% - Generates start/end terms randomly, which may not span any existing items.
gen_range_query(S) ->
oneof([gen_some_query(S), gen_all_query(S), gen_random_query(S)]).
gen_random_query(#state{bucket = Bucket}) ->
oneof([gen_int_query(Bucket), gen_bin_query(Bucket)]).
%% Query that includes all terms for a given field.
gen_all_query(#state{bucket = Bucket, indexes = Idx}) ->
?LET({{{_Type, Field}, Terms}, PageSize},
{oneof(Idx), gen_page_size()},
new_query(Bucket, Field, first_term(Terms), last_term(Terms),
PageSize)).
%% Chooses two existing terms as start and end.
gen_some_query(#state{bucket = Bucket, indexes = Idx}) ->
?LET({{{_Type, Field}, Terms}, PageSize},
{oneof(Idx), gen_page_size()},
?LET({{Term1, _}, {Term2, _}}, {oneof(Terms), oneof(Terms)},
new_query(Bucket, Field, Term1, Term2, PageSize))).
gen_int_query(Bucket) ->
?LET({Field, Term1, Term2, PageSize},
{gen_field(), gen_int_term(), gen_int_term(), gen_page_size()},
new_query(Bucket, Field, Term1, Term2, PageSize)).
gen_bin_query(Bucket) ->
?LET({Field, Term1, Term2, PageSize},
{gen_field(), gen_bin_term(), gen_bin_term(), gen_page_size()},
new_query(Bucket, Field, Term1, Term2, PageSize)).
%% Populates a new query record. For convenience, corrects the order of the
%% start and end terms so that start is always less than or equal to end.
%% That way we don't need any generator tricks for those.
new_query(Bucket, Field, Term1, Term2, PageSize) when Term1 > Term2 ->
#query{bucket = Bucket, field = Field,
start_term = Term2, end_term = Term1,
page_size = PageSize};
new_query(Bucket, Field, Term1, Term2, PageSize) ->
#query{bucket = Bucket, field = Field,
start_term = Term1, end_term = Term2,
page_size = PageSize}.
%% First term in a term to keys orddict.
first_term(TermKeys) ->
{Term, _} = hd(TermKeys),
Term.
%% Last term in a term to keys orddict.
last_term(TermKeys) ->
{Term, _} = lists:last(TermKeys),
Term.
%% ======================================================
%% States spec
%% ======================================================
initial_state() ->
pre_setup_state1.
initial_state_data() ->
#state{}.
pre_setup_state1(S) ->
#state{nodes = Nodes} = S,
[{pre_setup_state2, {call, ?MODULE, tx_create_clients, [Nodes]}}].
pre_setup_state2(_S) ->
[{default_state, {call, ?MODULE, tx_next_bucket, []}}].
default_state(S) ->
#state{clients = Clients, bucket = Bucket} = S,
[
{default_state, {call, ?MODULE, tx_index_single_term,
[Clients, gen_client_id(S), Bucket,
gen_key_list(), gen_field(), gen_term()]}},
{default_state, {call, ?MODULE, tx_index_multi_term,
[Clients, gen_client_id(S), Bucket,
gen_key_field_terms()]}},
{default_state, {call, ?MODULE, tx_delete_one,
[Clients, gen_client_id(S), Bucket,
gen_existing_key(S)]}},
{default_state, {call, ?MODULE, tx_query_range,
[Clients, gen_client_id(S), gen_range_query(S)]}}
].
%% Tweak transition weights such that deletes are rare.
%% Indexing a bunch or querying a bunch of items are equally likely.
weight(default_state, default_state, {call, _, tx_delete_one, _}) ->
1;
weight(_, _, _) ->
100.
%% State data mutations for each transition.
next_state_data(_, _, S, Clients, {call, _, tx_create_clients, [_]}) ->
S#state{clients = Clients};
next_state_data(_, _, S, Bucket, {call, _, tx_next_bucket, []}) ->
S#state{bucket = Bucket};
next_state_data(default_state, default_state, S, _,
{call, _, tx_index_single_term,
[_, _, _, NewKeys, Field, Term]}) ->
#state{keys = Keys0, indexes = Idx0} = S,
Keys1 = lists:umerge(NewKeys, Keys0),
Idx1 = model_index(NewKeys, Field, Term, Idx0),
S#state{keys = Keys1, indexes = Idx1};
next_state_data(default_state, default_state, S, _,
{call, _, tx_index_multi_term,
[_, _, _, KeyFieldTerms]}) ->
#state{keys = Keys0, indexes = Idx0} = S,
%% Add to list of keys and dedupe.
NewKeys = [K || {K, _, _} <- KeyFieldTerms],
Keys1 = lists:umerge(NewKeys, Keys0),
Idx1 = model_index(KeyFieldTerms, Idx0),
S#state{keys = Keys1, indexes = Idx1};
next_state_data(default_state, default_state, S, _,
{call, _, tx_delete_one, [_, _, _, Key]}) ->
#state{keys = Keys0, indexes = Idx0} = S,
Keys1 = lists:delete(Key, Keys0),
Idx1 = model_delete_key(Key, Idx0),
S#state{keys = Keys1, indexes = Idx1};
next_state_data(_, _, S, _, _) ->
%% Any other transition leaves state unchanged.
S.
%% No precondition checks. Among other things, that means that shrinking may
%% end up issuing deletes to keys that do not exist, which is harmless.
%% Any indexing, deleting or querying command can be issued at any point
%% in the sequence.
precondition(_From, _To, _S, {call, _, _, _}) ->
true.
%% Signal a test failure if there is an explicit error from the query or
%% if the results do not match what is in the model.
postcondition(_, _, S, {call, _, tx_query_range, [_, _, Query]}, {error, Err}) ->
{state, S, query, Query, error, Err};
postcondition(_, _, S, {call, _, tx_query_range, [_, Client, Query]}, Keys) ->
#state{indexes = Idx} = S,
ExpectedKeys = model_query_range(Query, Idx),
case lists:usort(Keys) =:= ExpectedKeys of
true -> true;
false -> {state, S, client, Client, query, Query,
expected, ExpectedKeys, actual, Keys}
end;
postcondition(_, _, _, _Call, _) ->
true.
%% ======================================================
%% State transition functions.
%% ======================================================
%% Returns a dictionary that stores a client object per each node
%% and client type.
%% {Node, Type} -> {Type, Client}
tx_create_clients(Nodes) ->
orddict:from_list([{{N, T}, {T, create_client(N, T)}}
|| N <- Nodes, T <- ?CLIENT_TYPES]).
%% Returns a different bucket name each time it's called.
tx_next_bucket() ->
N = ets:update_counter(bucket_table, bucket_number, 1),
NBin = integer_to_binary(N),
<<"bucket", NBin/binary>>.
%% Index a bunch of keys under the same field/term.
tx_index_single_term(Clients, ClientId, Bucket, Keys, Field, Term) ->
Client = get_client(ClientId, Clients),
lager:info("Indexing in ~p under (~p, ~p) using client ~p: ~w",
[Bucket, Field, Term, ClientId, Keys]),
[index_object(Client, Bucket, Key, Field, Term) || Key <- Keys],
ok.
%% Index a number of keys each under a different term.
tx_index_multi_term(Clients, ClientId, Bucket, KeyFieldTerms) ->
Client = get_client(ClientId, Clients),
lager:info("Indexing in ~p with client ~p: ~p",
[Bucket, ClientId, KeyFieldTerms]),
[index_object(Client, Bucket, Key, Field, Term)
|| {Key, Field, Term} <- KeyFieldTerms],
ok.
%% Delete a single object and all its associated index entries.
tx_delete_one(Clients, ClientId, Bucket, IntKey) ->
Client = get_client(ClientId, Clients),
lager:info("Deleting key ~p from bucket ~p using ~p",
[IntKey, Bucket, ClientId]),
delete_key(Client, Bucket, IntKey),
ok.
tx_query_range(Clients, ClientId, Query) ->
Client = get_client(ClientId, Clients),
Keys = lists:sort(query_range(Client, Query, [])),
lager:info("Query ~p, ~p from ~p to ~p, page = ~p, using ~p "
"returned ~p keys.",
[Query#query.bucket, Query#query.field, Query#query.start_term,
Query#query.end_term, Query#query.page_size, ClientId,
length(Keys)]),
%% Re-run with page sizes 1 -> 100, verify it's always the same result.
PageChecks =
[begin
Q2 = Query#query{page_size = PSize},
OKeys = lists:sort(query_range(Client, Q2, [])),
OKeys =:= Keys
end || PSize <- lists:seq(1, 100)],
case lists:all(fun is_true/1, PageChecks) of
true ->
Keys;
false ->
{error, mismatch_when_paged}
end.
%% ======================================================
%% Client utilities.
%% ======================================================
create_client(Node, pb) ->
rt:pbc(Node);
create_client(Node, http) ->
rt:httpc(Node).
get_client(ClientId, Clients) ->
orddict:fetch(ClientId, Clients).
%% Convert field/term pair to pb client argument format.
to_field_id_term(Field, Term) ->
{to_field_id(Field, Term), [Term]}.
to_field_id(Field, Term) when is_integer(Term) ->
{integer_index, Field};
to_field_id(Field, Term) when is_binary(Term) ->
{binary_index, Field}.
to_field_id_term_http(Field, Term) ->
{to_field_id_http(Field, Term), [Term]}.
to_field_id_http(Field, Term) when is_integer(Term) ->
iolist_to_binary([Field, "_int"]);
to_field_id_http(Field, Term) when is_binary(Term) ->
iolist_to_binary([Field, "_bin"]).
index_object({pb, C}, Bucket, Key0, Field, Term) ->
Key = to_bin_key(Key0),
FT = to_field_id_term(Field, Term),
Obj0 = case riakc_pb_socket:get(C, Bucket, Key) of
{ok, O} ->
O;
{error, notfound} ->
riakc_obj:new(Bucket, Key, Key)
end,
MD0 = riakc_obj:get_update_metadata(Obj0),
MD1 = riakc_obj:add_secondary_index(MD0, [FT]),
Obj1 = riakc_obj:update_metadata(Obj0, MD1),
ok = riakc_pb_socket:put(C, Obj1, [{dw, 3}]),
ok;
index_object({http, C}, Bucket, Key0, Field, Term) ->
Key = to_bin_key(Key0),
FT = to_field_id_term_http(Field, Term),
Obj0 = case rhc:get(C, Bucket, Key) of
{ok, O} ->
O;
{error, notfound} ->
riakc_obj:new(Bucket, Key, Key)
end,
MD0 = riakc_obj:get_update_metadata(Obj0),
MD1 = riakc_obj:add_secondary_index(MD0, [FT]),
Obj1 = riakc_obj:update_metadata(Obj0, MD1),
ok = rhc:put(C, Obj1, [{dw, 3}]),
ok.
delete_key({pb, PB}, Bucket, IntKey) ->
Key = to_bin_key(IntKey),
case riakc_pb_socket:get(PB, Bucket, Key) of
{ok, O} ->
ok = riakc_pb_socket:delete_obj(PB, O, [{dw, 3}]);
{error, notfound} ->
ok = riakc_pb_socket:delete(PB, Bucket, Key, [{dw, 3}])
end,
%% Wait until all tombstones have been reaped.
%% TODO: Do we need to reap tombstones for this test? I think now.
%% ok = rt:wait_until(fun() -> rt:pbc_really_deleted(PB, Bucket, [Key]) end);
ok;
delete_key({http, C}, Bucket, IntKey) ->
Key = to_bin_key(IntKey),
case rhc:get(C, Bucket, Key) of
{ok, O} ->
ok = rhc:delete_obj(C, O, [{dw, 3}]);
{error, notfound} ->
ok = rhc:delete(C, Bucket, Key, [{dw, 3}])
end,
%% Wait until all tombstones have been reaped.
%%ok = rt:wait_until(fun() -> rt:httpc_really_deleted(C, Bucket, [Key]) end).
ok.
%% Execute range query using a client, fetching multiple pages if necessary.
query_range({pb, PB} = Client,
#query { bucket = Bucket, field = FieldName,
start_term = Start, end_term = End,
page_size = PageSize, continuation = Cont } = Query,
AccKeys) ->
Field = to_field_id(FieldName, Start),
case riakc_pb_socket:get_index_range(PB, Bucket, Field, Start, End,
[{max_results, PageSize},
{continuation, Cont}]) of
{ok, ?INDEX_RESULTS{keys = Keys, continuation = undefined}} ->
AccKeys ++ Keys;
{ok, ?INDEX_RESULTS{keys = Keys, continuation = Cont1}} ->
Query1 = Query#query{continuation = Cont1},
query_range(Client, Query1, AccKeys ++ Keys)
end;
query_range({http, C} = Client,
#query { bucket = Bucket, field = FieldName,
start_term = Start, end_term = End,
page_size = PageSize, continuation = Cont } = Query,
AccKeys) ->
Field = to_field_id(FieldName, Start),
case rhc:get_index(C, Bucket, Field, {Start, End},
[{max_results, PageSize},
{continuation, Cont}]) of
{ok, ?INDEX_RESULTS{keys = Keys, continuation = undefined}} ->
AccKeys ++ Keys;
{ok, ?INDEX_RESULTS{keys = Keys, continuation = Cont1}} ->
Query1 = Query#query{continuation = Cont1},
query_range(Client, Query1, AccKeys ++ Keys)
end.
%% Close all clients, ignore errors.
close_clients(Clients) ->
[catch riakc_pb_socket:stop(Client) || {pb, Client} <- Clients],
ok.
%% ======================================================
%% Model data utilities
%% ======================================================
model_index([], Idx) ->
Idx;
model_index([{Keys, Field, Term} | More], Idx) ->
Idx1 = model_index(Keys, Field, Term, Idx),
model_index(More, Idx1).
model_index(NewKeys0, Field, Term, Idx) when is_list(NewKeys0) ->
TField = to_tfield(Field, Term),
NewKeys = lists:usort(NewKeys0),
TermKeys1 =
case orddict:find(TField, Idx) of
{ok, TermKeys0} ->
case orddict:find(Term, TermKeys0) of
{ok, Keys0} ->
MergedKeys = lists:umerge(NewKeys, Keys0),
orddict:store(Term, MergedKeys, TermKeys0);
_ ->
orddict:store(Term, NewKeys, TermKeys0)
end;
_ ->
[{Term, NewKeys}]
end,
orddict:store(TField, TermKeys1, Idx);
model_index(NewKey, Field, Term, Idx) ->
model_index([NewKey], Field, Term, Idx).
model_delete_key(Key, Idx) ->
[{Field, delete_key_from_term_keys(Key, TermKeys)}
|| {Field, TermKeys} <- Idx].
delete_key_from_term_keys(Key, TermKeys) ->
[{Term, lists:delete(Key, Keys)} || {Term, Keys} <- TermKeys].
%% Produces a typed field id. For example, "i1"/43 -> {int, "i1"}
to_tfield(FieldName, Term) ->
case is_integer(Term) of
true -> {int, FieldName};
false -> {bin, FieldName}
end.
%% Query against the modeled data.
model_query_range(Query, Idx) ->
#query{ field = Field, start_term = Start, end_term = End } = Query,
TField = to_tfield(Field, Start),
%% Collect all keys with terms within the given range, ignore others.
Scanner = fun({Term, Keys}, Acc) when Term >= Start, End >= Term ->
[Keys | Acc];
({_Term, _Keys}, Acc) ->
Acc
end,
case orddict:find(TField, Idx) of
error ->
[];
{ok, FieldIdx} ->
KeyGroups = lists:foldl(Scanner, [], FieldIdx),
IntKeys = lists:umerge(KeyGroups),
[to_bin_key(Key) || Key <- IntKeys]
end.
%% ======================================================
%% Internal
%% ======================================================
is_true(true) -> true;
is_true(_) -> false.
%% Initialize counter used to use a different bucket per run.
init_bucket_counter() ->
ets:new(bucket_table, [named_table, public]),
ets:insert_new(bucket_table, [{bucket_number, 0}]).
%% Convert integer object key to binary form.
to_bin_key(N) when is_integer(N) ->
iolist_to_binary(io_lib:format("~5..0b", [N]));
to_bin_key(Key) when is_binary(Key) ->
Key.
-endif.