Add verification of handoff heartbeat to verify_handoff test

Add testing of the handoff heartbeat change from the following pull
request: https://github.com/basho/riak_core/pull/560. Add an intercept
module for the riak_core_handoff_sender module to introduce artificial
delay on item visitation during a handoff fold. This delay along with
the changes to the verify_handoff test induces test failure when run
without the heartbeat change. The handoff_receive_timeout is exceeded,
handoff stalls, and the test eventually fails due to timeout. The test
succeeds when run with the heartbeat change.
This commit is contained in:
Kelly McLaughlin 2014-09-11 15:05:26 -06:00
parent 43ee07c579
commit 7cd2645564
2 changed files with 59 additions and 41 deletions

View File

@ -0,0 +1,8 @@
-module(riak_core_handoff_sender_intercepts).
-compile(export_all).
-include("intercept.hrl").
-define(M, riak_core_handoff_sender_orig).
delayed_visit_item_3(K, V, Acc) ->
timer:sleep(100),
?M:visit_item_orig(K, V, Acc).

View File

@ -26,19 +26,19 @@
%% straightforward: get a list of different versions of nodes and join them into a cluster, making sure that %% straightforward: get a list of different versions of nodes and join them into a cluster, making sure that
%% each time our data has been replicated: %% each time our data has been replicated:
confirm() -> confirm() ->
NTestItems = 10, %% How many test items to write/verify? NTestItems = 1000, %% How many test items to write/verify?
NTestNodes = 3, %% How many nodes to spin up for tests? NTestNodes = 3, %% How many nodes to spin up for tests?
TestMode = false, %% Set to false for "production tests", true if too slow. TestMode = false, %% Set to false for "production tests", true if too slow.
EncodingTypes = [default, encode_raw, encode_zlib], %% Usually, you won't want to fiddle with these. EncodingTypes = [default, encode_raw, encode_zlib], %% Usually, you won't want to fiddle with these.
lists:foreach(fun(EncodingType) -> run_test(TestMode, NTestItems, NTestNodes, EncodingType) end, EncodingTypes), [run_test(TestMode, NTestItems, NTestNodes, EncodingType) ||
EncodingType <- EncodingTypes],
lager:info("Test verify_handoff passed."), lager:info("Test verify_handoff passed."),
pass. pass.
run_test(TestMode, NTestItems, NTestNodes, HandoffEncoding) -> run_test(TestMode, NTestItems, NTestNodes, Encoding) ->
lager:info("Testing handoff (items ~p, encoding: ~p)", [NTestItems, Encoding]),
lager:info("Testing handoff (items ~p, encoding: ~p)", [NTestItems, HandoffEncoding]),
%% This resets nodes, cleans up stale directories, etc.: %% This resets nodes, cleans up stale directories, etc.:
lager:info("Cleaning up..."), lager:info("Cleaning up..."),
@ -49,37 +49,13 @@ run_test(TestMode, NTestItems, NTestNodes, HandoffEncoding) ->
rt:wait_for_service(RootNode, riak_kv), rt:wait_for_service(RootNode, riak_kv),
case HandoffEncoding of set_handoff_encoding(Encoding, Nodes),
default -> lager:info("Using default encoding type."), true;
_ -> lager:info("Forcing encoding type to ~p.", [HandoffEncoding]), %% Insert delay into handoff folding to test the efficacy of the
OverrideData = %% handoff heartbeat addition
[ [rt_intercept:add(N, {riak_core_handoff_sender,
{ riak_core, [{{visit_item, 3}, delayed_visit_item_3}]})
[ || N <- Nodes],
{ override_capability,
[
{ handoff_data_encoding,
[
{ use, HandoffEncoding},
{ prefer, HandoffEncoding}
]
}
]
}
]
}
],
rt:update_app_config(RootNode, OverrideData),
%% Update all nodes (capabilities are not re-negotiated):
lists:foreach(fun(TestNode) ->
rt:update_app_config(TestNode, OverrideData),
assert_using(RootNode, { riak_kv, handoff_data_encoding }, HandoffEncoding)
end,
Nodes)
end,
lager:info("Populating root node."), lager:info("Populating root node."),
rt:systest_write(RootNode, NTestItems), rt:systest_write(RootNode, NTestItems),
@ -100,6 +76,34 @@ run_test(TestMode, NTestItems, NTestNodes, HandoffEncoding) ->
lager:info("Stopping root node."), lager:info("Stopping root node."),
rt:brutal_kill(RootNode). rt:brutal_kill(RootNode).
set_handoff_encoding(default, _) ->
lager:info("Using default encoding type."),
true;
set_handoff_encoding(Encoding, Nodes) ->
lager:info("Forcing encoding type to ~p.", [Encoding]),
%% Update all nodes (capabilities are not re-negotiated):
[begin
rt:update_app_config(Node, override_data(Encoding)),
assert_using(Node, {riak_kv, handoff_data_encoding}, Encoding)
end || Node <- Nodes].
override_data(Encoding) ->
[
{ riak_core,
[
{ override_capability,
[
{ handoff_data_encoding,
[
{ use, Encoding},
{ prefer, Encoding}
]
}
]
}
]}].
%% See if we get the same data back from our new nodes as we put into the root node: %% See if we get the same data back from our new nodes as we put into the root node:
test_handoff(RootNode, NewNode, NTestItems) -> test_handoff(RootNode, NewNode, NTestItems) ->
@ -114,23 +118,29 @@ test_handoff(RootNode, NewNode, NTestItems) ->
%% See if we get the same data back from the joined node that we added to the root node. %% See if we get the same data back from the joined node that we added to the root node.
%% Note: systest_read() returns /non-matching/ items, so getting nothing back is good: %% Note: systest_read() returns /non-matching/ items, so getting nothing back is good:
lager:info("Validating data after handoff:"), lager:info("Validating data after handoff:"),
Results = rt:systest_read(NewNode, NTestItems), Results = rt:systest_read(NewNode, NTestItems),
?assertEqual(0, length(Results)), ?assertEqual(0, length(Results)),
Results2 = rt:systest_read(RootNode, 1, 2, {<<"type">>, <<"bucket">>}, 2), Results2 = rt:systest_read(RootNode, 1, 2, {<<"type">>, <<"bucket">>}, 2),
?assertEqual(0, length(Results2)), ?assertEqual(0, length(Results2)),
lager:info("Data looks ok."). lager:info("Data looks ok.").
assert_using(Node, {CapabilityCategory, CapabilityName}, ExpectedCapabilityName) -> assert_using(Node, {CapabilityCategory, CapabilityName}, ExpectedCapabilityName) ->
lager:info("assert_using ~p =:= ~p", [ExpectedCapabilityName, CapabilityName]), lager:info("assert_using ~p =:= ~p", [ExpectedCapabilityName, CapabilityName]),
ExpectedCapabilityName =:= rt:capability(Node, {CapabilityCategory, CapabilityName}). ExpectedCapabilityName =:= rt:capability(Node, {CapabilityCategory, CapabilityName}).
%% For some testing purposes, making these limits smaller is helpful: %% For some testing purposes, making these limits smaller is helpful:
deploy_test_nodes(false, N) -> deploy_test_nodes(false, N) ->
rt:deploy_nodes(N); Config = [{riak_core, [{ring_creation_size, 8},
{handoff_acksync_threshold, 20},
{handoff_receive_timeout, 2000}]}],
rt:deploy_nodes(N, Config);
deploy_test_nodes(true, N) -> deploy_test_nodes(true, N) ->
lager:info("WARNING: Using turbo settings for testing."), lager:info("WARNING: Using turbo settings for testing."),
Config = [{riak_core, [{forced_ownership_handoff, 8}, Config = [{riak_core, [{forced_ownership_handoff, 8},
{ring_creation_size, 8},
{handoff_concurrency, 8}, {handoff_concurrency, 8},
{vnode_inactivity_timeout, 1000}, {vnode_inactivity_timeout, 1000},
{handoff_acksync_threshold, 20},
{handoff_receive_timeout, 2000},
{gossip_limit, {10000000, 60000}}]}], {gossip_limit, {10000000, 60000}}]}],
rt:deploy_nodes(N, Config). rt:deploy_nodes(N, Config).