riak_test/tests/verify_fast_path.erl

228 lines
7.3 KiB
Erlang
Raw Normal View History

2015-03-02 20:37:55 +00:00
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(verify_fast_path).
-export([confirm/0, description/0]).
2015-03-02 20:37:55 +00:00
-include_lib("eunit/include/eunit.hrl").
-define(DEFAULT_RING_SIZE, 16).
-define(NVAL, 2).
-define(BUCKET_TYPE, <<"fast_path">>).
2015-03-06 01:12:24 +00:00
-define(BUCKET, {?BUCKET_TYPE, <<"bucket">>}).
-define(ASYNC_PUT_BUCKET_TYPE, <<"async_put">>).
-define(ASYNC_PUT_BUCKET, {?ASYNC_PUT_BUCKET_TYPE, <<"bucket">>}).
2015-03-02 20:37:55 +00:00
description() ->
"This test exercises the fast_path bucket property, which results in puts that avoid coordination "
++ "and reads before writes, and which therefore have lower latency and higher throughput. "
.
2015-03-02 20:37:55 +00:00
confirm() ->
%%
%% Set two clusters. We need one for most of the testing of this code path.
%% The first cluster will use the memory back end.
%% The second cluster will be a singleton cluster with the leveldb back end,
%% in order to test asynchronous puts
2015-03-02 20:37:55 +00:00
%%
[Cluster1, Cluster2] = rt:deploy_clusters([
{4, config(?DEFAULT_RING_SIZE, ?NVAL)},
{1, config(?DEFAULT_RING_SIZE, ?NVAL, riak_kv_eleveldb_backend)}
]),
rt:join_cluster(Cluster1),
% rt:join_cluster(Cluster2),
lager:info("Set up clusters: ~p, ~p", [Cluster1, Cluster2]),
2015-03-02 20:37:55 +00:00
%%
%% Select a random node, and use it to create an immutable bucket
%%
Node = lists:nth(random:uniform(length((Cluster1))), Cluster1),
2015-03-06 01:12:24 +00:00
rt:create_and_activate_bucket_type(Node, ?BUCKET_TYPE, [{fast_path, true}]),
rt:wait_until_bucket_type_status(?BUCKET_TYPE, active, Cluster1),
2015-03-06 01:12:24 +00:00
lager:info("Created ~p bucket type on ~p", [?BUCKET_TYPE, Node]),
%%
%%
%%
pass = confirm_put(Node),
pass = confirm_w(Cluster1),
pass = confirm_pw(Cluster1),
pass = confirm_rww(Cluster1),
pass = confirm_async_put(hd(Cluster2)),
2015-03-06 01:12:24 +00:00
pass.
%%
%% private
%%
confirm_put(Node) ->
ok = verify_put(Node, ?BUCKET, <<"confirm_put_key">>, <<"confirm_put_value">>),
lager:info("confirm_put...ok"),
pass.
2015-03-09 14:07:19 +00:00
confirm_w(Nodes) ->
2015-03-02 20:37:55 +00:00
%%
%% split the cluster into 2 paritions [dev1, dev2, dev3], [dev4]
%%
P1 = lists:sublist(Nodes, 3),
P2 = lists:sublist(Nodes, 4, 1),
PartitonInfo = rt:partition(P1, P2),
[Node1 | _Rest1] = P1,
verify_put(Node1, ?BUCKET, <<"confirm_w_key">>, <<"confirm_w_value">>),
[Node2 | _Rest2] = P2,
2015-03-09 14:07:19 +00:00
%%
%% By setting sloppy_quorum to false, we require a strict quorum of primaries. But because
%% we only have one node in the partition, the put should fail.
%%
verify_put_timeout(Node2, ?BUCKET, <<"confirm_w_key">>, <<"confirm_w_value">>, [{sloppy_quorum, false}, {timeout, 1000}]),
rt:heal(PartitonInfo),
lager:info("confirm_pw...ok"),
pass.
confirm_pw(Nodes) ->
%%
%% split the cluster into 2 paritions [dev1, dev2, dev3], [dev4]
%%
P1 = lists:sublist(Nodes, 3),
P2 = lists:sublist(Nodes, 4, 1),
PartitonInfo = rt:partition(P1, P2),
[Node1 | _Rest1] = P1,
verify_put(Node1, ?BUCKET, <<"confirm_pw_key">>, <<"confirm_pw_value">>),
[Node2 | _Rest2] = P2,
%%
%% Similar to the above test -- if pw is all, then we require n_val puts on primaries, but
%% the node is a singleton in the partition, so this, too, should fail.
%%
verify_put_timeout(Node2, ?BUCKET, <<"confirm_pw_key">>, <<"confirm_pw_value">>, [{pw, all}, {timeout, 1000}]),
rt:heal(PartitonInfo),
lager:info("confirm_pw...ok"),
pass.
confirm_rww(Nodes) ->
2015-03-02 20:37:55 +00:00
%%
%% split the cluster into 2 paritions
%%
P1 = lists:sublist(Nodes, 2),
P2 = lists:sublist(Nodes, 3, 2),
PartitonInfo = rt:partition(P1, P2),
2015-03-09 14:07:19 +00:00
NumFastMerges = num_fast_merges(Nodes),
%%
%% put different values into each partiton
%%
[Node1 | _Rest1] = P1,
verify_put(Node1, ?BUCKET, <<"confirm_rww_key">>, <<"confirm_rww_value1">>),
[Node2 | _Rest2] = P2,
verify_put(Node2, ?BUCKET, <<"confirm_rww_key">>, <<"confirm_rww_value2">>),
%%
2015-03-09 14:07:19 +00:00
%% After healing, both should agree on an arbitrary value
%%
rt:heal(PartitonInfo),
rt:wait_until(fun() ->
V1 = get(Node1, ?BUCKET, <<"confirm_rww_key">>),
V2 = get(Node2, ?BUCKET, <<"confirm_rww_key">>),
V1 =:= V2
end),
2015-03-09 14:07:19 +00:00
?assert(NumFastMerges < num_fast_merges(Nodes)),
lager:info("confirm_rww...ok"),
pass.
%%
%% In order to test asynchronous puts, at this point we need a node with leveldb, as
%% that is currently the only back end that supports it. In the future, we may add
%% async puts as a capability which can be arbitrated through the multi backend.
%%
confirm_async_put(Node) ->
rt:create_and_activate_bucket_type(Node, ?ASYNC_PUT_BUCKET_TYPE, [{fast_path, true}, {backend, myeleveldb}]),
rt:wait_until_bucket_type_status(?ASYNC_PUT_BUCKET_TYPE, active, [Node]),
lager:info("Created ~p bucket type on ~p", [?ASYNC_PUT_BUCKET_TYPE, Node]),
ok = verify_put(Node, ?ASYNC_PUT_BUCKET, <<"confirm_async_put_key">>, <<"confirm_async_put_value">>),
lager:info("confirm_async_put...ok"),
pass.
verify_put(Node, Bucket, Key, Value) ->
2015-03-02 20:37:55 +00:00
Client = rt:pbc(Node),
2015-03-06 01:12:24 +00:00
_Ret = riakc_pb_socket:put(
2015-03-02 20:37:55 +00:00
Client, riakc_obj:new(
Bucket, Key, Value
2015-03-02 20:37:55 +00:00
)
),
{ok, Val} = riakc_pb_socket:get(Client, Bucket, Key),
?assertEqual(Value, riakc_obj:get_value(Val)),
ok.
2015-03-02 20:37:55 +00:00
verify_put_timeout(Node, Bucket, Key, Value, Options) ->
Client = rt:pbc(Node),
?assertEqual(
{error, <<"timeout">>},
riakc_pb_socket:put(
Client, riakc_obj:new(
Bucket, Key, Value
), Options
)
),
ok.
2015-03-09 14:07:19 +00:00
num_fast_merges(Nodes) ->
lists:foldl(
fun(Node, Acc) ->
{fast_path_merge, N} = proplists:lookup(
fast_path_merge,
rpc:call(Node, riak_kv_stat, get_stats, [])
),
Acc + N
end,
0, Nodes
).
get(Node, Bucket, Key) ->
Client = rt:pbc(Node),
{ok, Val} = riakc_pb_socket:get(Client, Bucket, Key),
riakc_obj:get_value(Val).
2015-03-06 01:12:24 +00:00
2015-03-02 20:37:55 +00:00
config(RingSize, NVal) ->
config(RingSize, NVal, riak_kv_multi_backend).
config(RingSize, NVal, Backend) ->
2015-03-02 20:37:55 +00:00
[
{riak_core, [
{default_bucket_props, [{n_val, NVal}]},
{vnode_management_timer, 1000},
{ring_creation_size, RingSize}]
},
{riak_kv, [
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 100},
{anti_entropy_tick, 100},
{anti_entropy, {on, []}},
{anti_entropy_timeout, 5000},
{storage_backend, Backend},
{multi_backend, [
{mymemory, riak_kv_memory_backend, []},
{myeleveldb, riak_kv_eleveldb_backend, []}
]}
]}
2015-03-02 20:37:55 +00:00
].