Merge branch 'jdb-upgrade-changes'

This commit is contained in:
Joseph Blomstedt 2012-08-08 18:50:08 -07:00
commit 1e1656735e
8 changed files with 67 additions and 40 deletions

View File

@ -19,6 +19,7 @@
get_os_env/2, get_os_env/2,
get_ring/1, get_ring/1,
admin/2, admin/2,
upgrade/2,
wait_until_pingable/1, wait_until_pingable/1,
wait_until_unpingable/1, wait_until_unpingable/1,
wait_until_ready/1, wait_until_ready/1,
@ -106,6 +107,25 @@ async_start(Node) ->
stop(Node) -> stop(Node) ->
?HARNESS:stop(Node). ?HARNESS:stop(Node).
%% @doc Upgrade a Riak node to a specific version
upgrade(Node, NewVersion) ->
?HARNESS:upgrade(Node, NewVersion).
%% @doc Upgrade a Riak node to a specific version using the alternate
%% leave/upgrade/rejoin approach
slow_upgrade(Node, NewVersion, Nodes) ->
lager:info("Perform leave/upgrade/join upgrade on ~p", [Node]),
lager:info("Leaving ~p", [Node]),
leave(Node),
?assertEqual(ok, rt:wait_until_unpingable(Node)),
upgrade(Node, NewVersion),
lager:info("Rejoin ~p", [Node]),
join(Node, hd(Nodes -- [Node])),
lager:info("Wait until all nodes are ready and there are no pending changes"),
?assertEqual(ok, wait_until_nodes_ready(Nodes)),
?assertEqual(ok, wait_until_no_pending_changes(Nodes)),
ok.
%% @doc Have `Node' send a join request to `PNode' %% @doc Have `Node' send a join request to `PNode'
join(Node, PNode) -> join(Node, PNode) ->
R = try_join(Node, PNode), R = try_join(Node, PNode),
@ -522,3 +542,17 @@ systest_read(Node, Start, End, Bucket, R) ->
end end
end, end,
lists:foldl(F, [], lists:seq(Start, End)). lists:foldl(F, [], lists:seq(Start, End)).
%% utility function
pmap(F, L) ->
Parent = self(),
lists:foldl(
fun(X, N) ->
spawn(fun() ->
Parent ! {pmap, N, F(X)}
end),
N+1
end, 0, L),
L2 = [receive {pmap, N, R} -> {N,R} end || _ <- L],
{_, L3} = lists:unzip(lists:keysort(1, L2)),
L3.

View File

@ -116,11 +116,11 @@ deploy_nodes(NodeConfig) ->
%% Stop nodes if already running %% Stop nodes if already running
%% [run_riak(N, relpath(node_version(N)), "stop") || N <- Nodes], %% [run_riak(N, relpath(node_version(N)), "stop") || N <- Nodes],
pmap(fun(Node) -> rt:pmap(fun(Node) ->
N = node_id(Node), N = node_id(Node),
run_riak(N, relpath(node_version(N)), "stop"), run_riak(N, relpath(node_version(N)), "stop"),
rt:wait_until_unpingable(Node) rt:wait_until_unpingable(Node)
end, Nodes), end, Nodes),
%% ?debugFmt("Shutdown~n", []), %% ?debugFmt("Shutdown~n", []),
%% Reset nodes to base state %% Reset nodes to base state
@ -134,16 +134,16 @@ deploy_nodes(NodeConfig) ->
create_dirs(Nodes), create_dirs(Nodes),
%% Set initial config %% Set initial config
pmap(fun({_, default}) -> rt:pmap(fun({_, default}) ->
ok; ok;
({Node, Config}) -> ({Node, Config}) ->
update_app_config(Node, Config) update_app_config(Node, Config)
end, end,
lists:zip(Nodes, Configs)), lists:zip(Nodes, Configs)),
%% Start nodes %% Start nodes
%%[run_riak(N, relpath(node_version(N)), "start") || N <- Nodes], %%[run_riak(N, relpath(node_version(N)), "start") || N <- Nodes],
pmap(fun(N) -> run_riak(N, relpath(node_version(N)), "start") end, NodesN), rt:pmap(fun(N) -> run_riak(N, relpath(node_version(N)), "start") end, NodesN),
%% Ensure nodes started %% Ensure nodes started
[ok = rt:wait_until_pingable(N) || N <- Nodes], [ok = rt:wait_until_pingable(N) || N <- Nodes],
@ -219,16 +219,3 @@ get_cmd_result(Port, Acc) ->
after 0 -> after 0 ->
timeout timeout
end. end.
pmap(F, L) ->
Parent = self(),
lists:foldl(
fun(X, N) ->
spawn(fun() ->
Parent ! {pmap, N, F(X)}
end),
N+1
end, 0, L),
L2 = [receive {pmap, N, R} -> {N,R} end || _ <- L],
{_, L3} = lists:unzip(lists:keysort(1, L2)),
L3.

View File

@ -5,12 +5,10 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
gh_riak_core_154() -> gh_riak_core_154() ->
Nodes = rt:build_cluster(2), %% Increase handoff concurrency on nodes
[Node1, Node2] = Nodes,
lager:info("Increase handoff concurrency on nodes"),
NewConfig = [{riak_core, [{handoff_concurrency, 1024}]}], NewConfig = [{riak_core, [{handoff_concurrency, 1024}]}],
[rt:update_app_config(Node, NewConfig) || Node <- Nodes], Nodes = rt:build_cluster(2, NewConfig),
[Node1, Node2] = Nodes,
lager:info("Write data while ~p is offline", [Node2]), lager:info("Write data while ~p is offline", [Node2]),
rt:stop(Node2), rt:stop(Node2),

View File

@ -13,6 +13,13 @@ loaded_upgrade() ->
verify_upgrade(OldVsn) -> verify_upgrade(OldVsn) ->
Config = [{riak_search, [{enabled, true}]}], Config = [{riak_search, [{enabled, true}]}],
%% Uncomment to use settings more prone to cause races
%% Config = [{riak_core, [{handoff_concurrency, 1024},
%% {vnode_inactivity_timeout, 1000},
%% {vnode_rolling_start, 128},
%% {vnode_management_timer, 1000},
%% {gossip_limit, {10000, 1000}}]},
%% {riak_search, [{enabled, true}]}],
NumNodes = 4, NumNodes = 4,
Vsns = [{OldVsn, Config} || _ <- lists:seq(2,NumNodes)], Vsns = [{OldVsn, Config} || _ <- lists:seq(2,NumNodes)],
Nodes = rt:build_cluster([{current, Config} | Vsns]), Nodes = rt:build_cluster([{current, Config} | Vsns]),
@ -33,7 +40,8 @@ verify_upgrade(OldVsn) ->
MR2 = spawn_mapred_tester(MR1), MR2 = spawn_mapred_tester(MR1),
Search2 = spawn_search_tester(Search1), Search2 = spawn_search_tester(Search1),
lager:info("Upgrading ~p", [Node]), lager:info("Upgrading ~p", [Node]),
rtdev:upgrade(Node, current), rt:upgrade(Node, current),
%% rt:slow_upgrade(Node, current, Nodes),
_KV3 = check_kv_tester(KV2), _KV3 = check_kv_tester(KV2),
_MR3 = check_mapred_tester(MR2), _MR3 = check_mapred_tester(MR2),
_Search3 = check_search_tester(Search2, false), _Search3 = check_search_tester(Search2, false),

View File

@ -25,7 +25,7 @@ rolling_capabilities() ->
lager:info("Deploying Riak ~p cluster", [OldVsn]), lager:info("Deploying Riak ~p cluster", [OldVsn]),
Nodes = rt:build_cluster([OldVsn || _ <- lists:seq(1,Count)]), Nodes = rt:build_cluster([OldVsn || _ <- lists:seq(1,Count)]),
lists:foldl(fun(Node, Upgraded) -> lists:foldl(fun(Node, Upgraded) ->
rtdev:upgrade(Node, current), rt:upgrade(Node, current),
Upgraded2 = Upgraded ++ [Node], Upgraded2 = Upgraded ++ [Node],
lager:info("Verifying rolling/old capabilities"), lager:info("Verifying rolling/old capabilities"),
(Upgraded2 == Nodes) (Upgraded2 == Nodes)

View File

@ -3,13 +3,13 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
upgrade() -> upgrade() ->
Nodes = rt:build_cluster(["1.0.3", "1.0.3", "1.1.2", current]), Nodes = rt:build_cluster(["1.0.3", "1.0.3", "1.1.4", current]),
[Node1, Node2, Node3, _Node4] = Nodes, [Node1, Node2, Node3, _Node4] = Nodes,
lager:info("Writing 100 keys"), lager:info("Writing 100 keys"),
rt:systest_write(Node1, 100, 3), rt:systest_write(Node1, 100, 3),
?assertEqual([], rt:systest_read(Node1, 100, 1)), ?assertEqual([], rt:systest_read(Node1, 100, 1)),
rtdev:upgrade(Node1, "1.1.2"), rt:upgrade(Node1, current),
lager:info("Ensuring keys still exist"), lager:info("Ensuring keys still exist"),
rt:stop(Node2), rt:stop(Node2),
rt:stop(Node3), rt:stop(Node3),

View File

@ -3,7 +3,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
verify_basic_upgrade() -> verify_basic_upgrade() ->
OldVsns = ["1.0.3", "1.1.2"], OldVsns = ["1.0.3", "1.1.4"],
[build_cluster(OldVsn, current) || OldVsn <- OldVsns], [build_cluster(OldVsn, current) || OldVsn <- OldVsns],
[build_cluster(current, OldVsn) || OldVsn <- OldVsns], [build_cluster(current, OldVsn) || OldVsn <- OldVsns],
lager:info("Test ~p passed", [?MODULE]), lager:info("Test ~p passed", [?MODULE]),
@ -26,8 +26,8 @@ build_cluster(Vsn1, Vsn2) ->
?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes)), ?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes)),
?assertEqual([], rt:systest_read(Node1, 100, 1)), ?assertEqual([], rt:systest_read(Node1, 100, 1)),
(Vsn1 /= current) andalso rtdev:upgrade(Node1, current), (Vsn1 /= current) andalso rt:upgrade(Node1, current),
(Vsn2 /= current) andalso rtdev:upgrade(Node2, current), (Vsn2 /= current) andalso rt:upgrade(Node2, current),
timer:sleep(1000), timer:sleep(1000),
lager:info("Ensuring keys still exist"), lager:info("Ensuring keys still exist"),

View File

@ -41,7 +41,7 @@ verify_capabilities() ->
?assertEqual(legacy, rt:capability(Node1, {riak_core, vnode_routing})), ?assertEqual(legacy, rt:capability(Node1, {riak_core, vnode_routing})),
lager:info("Upgrade 0.14.2 node"), lager:info("Upgrade 0.14.2 node"),
rtdev:upgrade(Node2, current), rt:upgrade(Node2, current),
lager:info("Verifying vnode_routing == proxy"), lager:info("Verifying vnode_routing == proxy"),
?assertEqual(ok, rt:wait_until_capability(Node1, {riak_core, vnode_routing}, proxy)), ?assertEqual(ok, rt:wait_until_capability(Node1, {riak_core, vnode_routing}, proxy)),
@ -54,13 +54,13 @@ verify_capabilities() ->
?assertEqual(legacy, rt:capability(Node1, {riak_core, vnode_routing})), ?assertEqual(legacy, rt:capability(Node1, {riak_core, vnode_routing})),
lager:info("Upgrading 1.0.3 node"), lager:info("Upgrading 1.0.3 node"),
rtdev:upgrade(Node4, current), rt:upgrade(Node4, current),
lager:info("Verifying vnode_routing changes to proxy"), lager:info("Verifying vnode_routing changes to proxy"),
?assertEqual(ok, rt:wait_until_capability(Node1, {riak_core, vnode_routing}, proxy)), ?assertEqual(ok, rt:wait_until_capability(Node1, {riak_core, vnode_routing}, proxy)),
lager:info("Upgrade 1.1.4 node"), lager:info("Upgrade 1.1.4 node"),
rtdev:upgrade(Node3, current), rt:upgrade(Node3, current),
%% All nodes are now current version. Test override behavior. %% All nodes are now current version. Test override behavior.
Override = fun(undefined, Prefer) -> Override = fun(undefined, Prefer) ->