Added intercepts to count the synchronous and asynchronous calls through handle_command, to ensure we are testing the case statements in riak_kv_vnode:handle_command with write_once puts.

This commit is contained in:
Fred Dushin 2015-07-20 21:23:22 -04:00
parent e38c0f2533
commit 666f0ca375
2 changed files with 92 additions and 49 deletions

View File

@ -9,6 +9,10 @@
type :: primary | fallback
% start_time :: non_neg_integer(), Jon to add?
}).
-record(riak_kv_w1c_put_reply_v1, {
reply :: ok | {error, term()},
type :: primary | fallback
}).
-define(M, riak_kv_vnode_orig).
@ -38,6 +42,7 @@ slow_handle_coverage(Req, Filter, Sender, State) ->
timer:sleep(Rand),
?M:handle_coverage_orig(Req, Filter, Sender, State).
%% @doc Count how many times we call handle_handoff_command
count_handoff_w1c_puts(#riak_kv_w1c_put_req_v1{}=Req, Sender, State) ->
Val = ?M:handle_handoff_command_orig(Req, Sender, State),
ets:update_counter(intercepts_tab, w1c_put_counter, 1),
@ -45,6 +50,21 @@ count_handoff_w1c_puts(#riak_kv_w1c_put_req_v1{}=Req, Sender, State) ->
count_handoff_w1c_puts(Req, Sender, State) ->
?M:handle_handoff_command_orig(Req, Sender, State).
%% @doc Count how many times we handle syncchronous and asynchronous replies
%% in handle_command when using w1c buckets
count_w1c_handle_command(#riak_kv_w1c_put_req_v1{}=Req, Sender, State) ->
case ?M:handle_command_orig(Req, Sender, State) of
{noreply, NewState} ->
ets:update_counter(intercepts_tab, w1c_async_replies, 1),
{noreply, NewState};
{reply, #riak_kv_w1c_put_reply_v1{reply=ok, type=Type}, NewState} ->
ets:update_counter(intercepts_tab, w1c_sync_replies, 1),
{reply, #riak_kv_w1c_put_reply_v1{reply=ok, type=Type}, NewState};
Any -> Any
end;
count_w1c_handle_command(Req, Sender, State) ->
?M:handle_command_orig(Req, Sender, State).
%% @doc Simulate dropped gets/network partitions byresponding with
%% noreply during get requests.
drop_do_get(Sender, BKey, ReqId, State) ->

View File

@ -1,6 +1,6 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc.
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@ -24,79 +24,102 @@
-define(BUCKET_TYPE, <<"write_once">>).
-define(BUCKET, {?BUCKET_TYPE, <<"write_once">>}).
%% We've got a separate test for capability negotiation and other mechanisms, so the test here is fairly
%% straightforward: get a list of different versions of nodes and join them into a cluster, making sure that
%% each time our data has been replicated:
%% @doc
confirm() ->
NTestItems = 1000, %% How many test items to write/verify?
NTestNodes = 2, %% How many nodes to spin up for tests?
run_test(NTestItems, NTestNodes),
AsyncConfig = create_config(riak_kv_eleveldb_backend),
AsyncCluster = run_test(AsyncConfig, true),
rt:clean_cluster(AsyncCluster),
SyncConfig = create_config(riak_kv_memory_backend),
_SyncCluster = run_test(SyncConfig, false),
lager:info("Test verify_handoff passed."),
pass.
run_test(NTestItems, NTestNodes) ->
lager:info("Testing handoff (items ~p, encoding: default)", [NTestItems]),
lager:info("Spinning up test nodes"),
[RootNode | TestNodes] = deploy_test_nodes(NTestNodes),
rt:wait_for_service(RootNode, riak_kv),
create_config(Backend) ->
[{riak_core, [
{default_bucket_props, [{n_val, 1}]},
{ring_creation_size, 8},
{handoff_acksync_threshold, 20},
{handoff_concurrency, 4},
{handoff_receive_timeout, 2000},
{vnode_management_timer, 100}]},
{riak_kv, [
{storage_backend, Backend}]}
].
run_test(Config, AsyncWrites) ->
%%
%% Deploy 2 nodes based on config. Wait for K/V to start on each node.
%%
lager:info("Deploying 2 nodes..."),
Cluster = [RootNode, NewNode] = deploy_test_nodes(2, Config),
[rt:wait_for_service(Node, riak_kv) || Node <- [RootNode, NewNode]],
%%
%% Set up the intercepts
%%
lager:info("Setting up intercepts..."),
make_intercepts_tab(RootNode),
% This intercept will tell the backround process (below) to send an event for each
% vnode that is being handed off (there will be 4 such vnodes, in this test case)
rt_intercept:add(
RootNode, {riak_kv_worker, [{{handle_work, 3}, handle_work_intercept}]}
),
%% Count everytime riak_kv_vnode:handle_handoff_command/3 is called with a write_once message
rt_intercept:add(
RootNode, {riak_kv_vnode, [{{handle_handoff_command, 3}, count_handoff_w1c_puts}]}
RootNode, {riak_kv_vnode, [
%% Count everytime riak_kv_vnode:handle_handoff_command/3 is called with a write_once message
{{handle_handoff_command, 3}, count_handoff_w1c_puts},
%% Count everytime riak_kv_vnode:handle_handoff_command/3 is called with a write_once message
{{handle_command, 3}, count_w1c_handle_command}
]}
),
lager:info("Populating root node."),
rt:create_and_activate_bucket_type(RootNode, ?BUCKET_TYPE, [{write_once, true}]),
rt:systest_write(RootNode, 1, NTestItems, ?BUCKET, 1),
lager:info("Testing handoff for cluster."),
lists:foreach(fun(TestNode) -> test_handoff(RootNode, TestNode, NTestItems) end, TestNodes).
%% See if we get the same data back from our new nodes as we put into the root node:
test_handoff(RootNode, NewNode, NTestItems) ->
lager:info("Waiting for service on new node."),
rt:wait_for_service(NewNode, riak_kv),
%% Set the w1c_put counter to 0
true = rpc:call(RootNode, ets, insert, [intercepts_tab, {w1c_async_replies, 0}]),
true = rpc:call(RootNode, ets, insert, [intercepts_tab, {w1c_sync_replies, 0}]),
true = rpc:call(RootNode, ets, insert, [intercepts_tab, {w1c_put_counter, 0}]),
lager:info("Joining new node with cluster."),
%%
%% Seed the root node with some data
%%
lager:info("Populating root node..."),
rt:create_and_activate_bucket_type(RootNode, ?BUCKET_TYPE, [{write_once, true}, {n_val, 1}]),
NTestItems = 1000,
rt:systest_write(RootNode, 1, NTestItems, ?BUCKET, 1),
%%
%% Start an asynchronous proc which will send puts into riak during handoff.
%%
lager:info("Joining new node with cluster..."),
start_proc(RootNode, NTestItems),
timer:sleep(1000),
rt:join(NewNode, RootNode),
?assertEqual(ok, rt:wait_until_nodes_ready([RootNode, NewNode])),
rt:wait_until_no_pending_changes([RootNode, NewNode]),
?assertEqual(ok, rt:wait_until_nodes_ready(Cluster)),
rt:wait_until_no_pending_changes(Cluster),
rt:wait_until_transfers_complete(Cluster),
TotalSent = stop_proc(),
%% See if we get the same data back from the joined node that we added to the root node.
%% Note: systest_read() returns /non-matching/ items, so getting nothing back is good:
%%
%% Verify the results
%%
lager:info("Validating data after handoff..."),
Results2 = rt:systest_read(NewNode, 1, TotalSent, ?BUCKET, 1),
?assertEqual([], Results2),
lager:info("Data looks good. Read ~p entries.", [TotalSent]),
lager:info("Read ~p entries.", [TotalSent]),
[{_, Count}] = rpc:call(RootNode, ets, lookup, [intercepts_tab, w1c_put_counter]),
?assert(Count > 0),
lager:info("Looking Good. We handled ~p write_once puts during handoff.", [Count]).
lager:info("We handled ~p write_once puts during handoff.", [Count]),
[{_, W1CAsyncReplies}] = rpc:call(RootNode, ets, lookup, [intercepts_tab, w1c_async_replies]),
[{_, W1CSyncReplies}] = rpc:call(RootNode, ets, lookup, [intercepts_tab, w1c_sync_replies]),
case AsyncWrites of
true ->
%?assertEqual(1008, W1CAsyncReplies),
?assertEqual(0, W1CSyncReplies);
false ->
?assertEqual(0, W1CAsyncReplies)
%?assertEqual(1008, W1CSyncReplies)
end,
%%
Cluster.
deploy_test_nodes(N) ->
Config = [{riak_core, [{default_bucket_props, [{n_val, 1}]},
{ring_creation_size, 8},
{handoff_acksync_threshold, 20},
{handoff_concurrency, 4},
{handoff_receive_timeout, 2000},
{vnode_management_timer, 1000}]}],
deploy_test_nodes(N, Config) ->
rt:deploy_nodes(N, Config).
make_intercepts_tab(Node) ->