A passing full losed upgrade run

This commit is contained in:
Joe DeVivo 2013-02-01 15:08:03 -07:00
parent d9563559e4
commit 14a2d376c5
5 changed files with 128 additions and 31 deletions

View File

@ -18,7 +18,7 @@
]}.
{escript_incl_apps, [lager, getopt, riakhttpc, riakc, ibrowse, mochiweb, kvc]}.
{escript_emu_args, "%%! +K true +P 10000 -env ERL_MAX_PORTS 10000\n"}.
{plugin_dir, "src"}.
{plugins, [rebar_riak_test_plugin]}.
{riak_test, [

View File

@ -475,10 +475,13 @@ load_modules_on_nodes([Module | MoreModules], Nodes)
is_pingable(Node) ->
net_adm:ping(Node) =:= pong.
is_mixed_cluster(Nodes) ->
is_mixed_cluster(Nodes) when is_list(Nodes) ->
%% If the nodes are bad, we don't care what version they are
{Versions, _BadNodes} = rpc:multicall(Nodes, init, script_id, [], rt:config(rt_max_wait_time)),
length(lists:usort(Versions)) > 1.
length(lists:usort(Versions)) > 1;
is_mixed_cluster(Node) ->
Nodes = rpc:call(Node, erlang, nodes, []),
is_mixed_cluster(Nodes).
%% @private
is_ready(Node) ->
@ -574,7 +577,7 @@ wait_until_transfers_complete([Node0|_]) ->
ok.
wait_for_service(Node, Service) ->
lager:info("Wait for service ~p on ~p", [Service, Node]),
%%lager:info("Wait for service ~p on ~p", [Service, Node]),
F = fun(N) ->
Services = rpc:call(N, riak_core_node_watcher, services, [N]),
lists:member(Service, Services)

View File

@ -28,4 +28,4 @@ init(Props) ->
lager:debug("About to go all supervisor on ~p~n", [ChildSpecs]),
{ok, {{one_for_one, 5, 60}, ChildSpecs}}.
{ok, {{one_for_one, 1000, 60}, ChildSpecs}}.

View File

@ -175,11 +175,20 @@ list_keys_tester(Name, Node, Count, PBC) ->
{ok, Keys} ->
ActualKeys = lists:usort(Keys),
ExpectedKeys = lists:usort([new_loaded_upgrade:int_to_key(K) || K <- lists:seq(0, 100)]),
assert_equal(Name, ExpectedKeys, ActualKeys),
case assert_equal(Name, ExpectedKeys, ActualKeys) of
true -> cool;
_ -> loaded_upgrade ! {listkeys, not_equal}
end,
list_keys_tester(Name, Node, Count + 1, PBC);
{error, timeout} ->
loaded_upgrade ! {listkeys, timeout},
list_keys_tester(Name, Node, Count + 1, PBC);
{error, {timeout, _}} ->
loaded_upgrade ! {listkeys, timeout},
list_keys_tester(Name, Node, Count + 1, PBC);
{error, Reason} ->
lager:debug("<~p> list keys connection error ~p", [Name, Reason]),
list_keys_tester(Name, Node, Count, rt:pbc(Node))
list_keys_tester(Name, Node, Count, pb_pid_recycler(PBC, Node))
end.
@ -188,31 +197,56 @@ kv_tester(Name, Node, Count, PBC) ->
Key = Count rem 8000,
case riakc_pb_socket:get(PBC, new_loaded_upgrade:bucket(kv), new_loaded_upgrade:int_to_key(Key)) of
{ok, Val} ->
?assertEqual(new_loaded_upgrade:kv_valgen(Key), riakc_obj:get_value(Val)),
case new_loaded_upgrade:kv_valgen(Key) == riakc_obj:get_value(Val) of
true -> cool;
_ -> loaded_upgrade ! {kv, not_equal}
end,
kv_tester(Name, Node, Count + 1, PBC);
{error, disconnected} ->
kv_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, Reason} ->
lager:debug("<~p> kv_tester connection error ~p", [Name, Reason]),
kv_tester(Name, Node, Count, rt:pbc(Node))
loaded_upgrade ! {kv, {error, Reason}},
kv_tester(Name, Node, Count, pb_pid_recycler(PBC, Node))
end.
mapred_tester(Name, Node, Count, PBC) ->
%%lager:debug("<~p> mapred test #~p", [Name, Count]),
case riakc_pb_socket:mapred(PBC, new_loaded_upgrade:bucket(mapred), new_loaded_upgrade:erlang_mr()) of
{ok, [{1, [10000]}]} ->
?assert(true),
mapred_tester(Name, Node, Count + 1, PBC);
{ok, R} ->
io:format("< ~p > bad mapred result: ~p", [Name, R]),
?assert(false);
loaded_upgrade ! {mapred, bad_result},
mapred_tester(Name, Node, Count + 1, PBC);
{error, disconnected} ->
lager:debug("<~p> mapred connection error: ~p", [Name, disconnected]),
mapred_tester(Name, Node, Count, rt:pbc(Node));
%%lager:debug("<~p> mapred connection error: ~p", [Name, disconnected]),
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
%% Finkmaster Flex says timeouts are ok
{error, timeout} ->
mapred_tester(Name, Node, Count + 1, PBC);
{error, {timeout, _}} ->
%% Finkmaster Flex says timeouts are ok
mapred_tester(Name, Node, Count + 1, rt:pbc(Node));
{error, Reason} ->
mapred_tester(Name, Node, Count + 1, PBC);
{error, <<"{\"phase\":\"listkeys\",\"error\":\"{badmatch,{'EXIT',noproc}}", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":0,\"error\":\"badarg", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":0,\"error\":\"[preflist_exhausted]", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":0,\"error\":\"{badmatch,{'EXIT',timeout}}", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":\"listkeys\",\"error\":\"function_clause\",\"input\":\"{cover,", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":\"listkeys\",\"error\":\"badarg\",\"input\":\"{cover,", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"Error processing stream message: exit:{ucs,{bad_utf8_character_code}}:[{xmerl_ucs,", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":0,\"error\":\"[{vnode_down,{shutdown,{gen_fsm,sync_send_event,", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, <<"{\"phase\":0,\"error\":\"[{vnode_down,noproc}]", _/binary>>} ->
mapred_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, Reason} ->
lager:debug("< ~p > mapred error: ~p", [Name, Reason]),
?assert(false)
loaded_upgrade ! {error, Reason}
end.
twoi_tester(Name, Node, Count, PBC) ->
@ -232,15 +266,22 @@ twoi_tester(Name, Node, Count, PBC) ->
Key + 1)
} of
{{ok, BinKeys}, {ok, IntKeys}} ->
assert_equal(Name, ExpectedKeys, BinKeys),
assert_equal(Name, ExpectedKeys, IntKeys),
case {assert_equal(Name, ExpectedKeys, BinKeys), assert_equal(Name, ExpectedKeys, IntKeys)} of
{true, true} -> cool;
{false, false} ->
loaded_upgrade ! {twoi, bolth_no_match};
{false, true} ->
loaded_upgrade ! {twoi, bin_no_match};
{true, false} ->
loaded_upgrade ! {twoi, int_no_match}
end,
twoi_tester(Name, Node, Count + 1, PBC);
{{error, Reason}, _} ->
lager:debug("<~p> 2i connection error: ~p", [Name, Reason]),
twoi_tester(Name, Node, Count, rt:pbc(Node));
twoi_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{_, {error, Reason}} ->
lager:debug("<~p> 2i connection error: ~p", [Name, Reason]),
twoi_tester(Name, Node, Count, rt:pbc(Node))
twoi_tester(Name, Node, Count, pb_pid_recycler(PBC, Node))
end.
search_tester(Name, Node, Count, PBC) ->
@ -250,9 +291,18 @@ search_tester(Name, Node, Count, PBC) ->
{ok, Result} ->
?assertEqual(Size, Result#search_results.num_found),
search_tester(Name, Node, Count + 1, PBC);
{error, Reason} ->
lager:debug("<~p> search connection error: ~p", [Name, Reason]),
search_tester(Name, Node, Count, rt:pbc(Node))
{error, disconnected} ->
%% oh well, reconnect
search_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
{error, Reason} when is_binary(Reason) ->
case string:str(binary_to_list(Reason), "badfun") of
0 -> %% This is not badfun, probably a connection issue
lager:debug("<~p> search connection error: ~p", [Name, Reason]),
search_tester(Name, Node, Count, pb_pid_recycler(PBC, Node));
_ -> %% this is badfun
?assert(rt:is_mixed_cluster(Node)),
search_tester(Name, Node, Count + 1, PBC)
end
end.
search_check(Count) ->
@ -270,6 +320,13 @@ assert_equal(Name, Expected, Actual) ->
[] -> ok;
Diff -> lager:info("<~p> Expected -- Actual: ~p", [Name, Diff])
end,
?assertEqual(length(Actual), length(Expected)),
?assertEqual(Actual, Expected).
Actual == Expected.
pb_pid_recycler(Pid, Node) ->
case riakc_pb_socket:is_connected(Pid) of
true ->
Pid;
_ ->
rt:pbc(Node)
end.

View File

@ -6,17 +6,22 @@
-export([kv_valgen/1, bucket/1, erlang_mr/0, int_to_key/1]).
-define(TIME_BETWEEN_UPGRADES, 300000).
-define(TIME_BETWEEN_UPGRADES, 300).
confirm() ->
case whereis(loaded_upgrade) of
undefined -> meh;
_ -> unregister(loaded_upgrade)
end,
register(loaded_upgrade, self()),
%% Build Cluster
TestMetaData = riak_test_runner:metadata(),
%% Only run 2i for level
Backend = proplists:get_value(backend, TestMetaData),
OldVsn = proplists:get_value(upgrade_version, TestMetaData, previous),
Config = [{riak_search, [{enabled, true}]}, {riak_pipe, [{worker_limit, 100}]}],
Config = [{riak_search, [{enabled, true}]}, {riak_pipe, [{worker_limit, 200}]}],
NumNodes = 4,
Vsns = [{OldVsn, Config} || _ <- lists:seq(1,NumNodes)],
Nodes = rt:build_cluster(Vsns),
@ -36,7 +41,8 @@ confirm() ->
|| Node <- Nodes],
%% TODO: Replace with a recieve block
timer:sleep(?TIME_BETWEEN_UPGRADES),
%%timer:sleep(?TIME_BETWEEN_UPGRADES * 100),
upgrade_recv_loop(),
[begin
exit(Sup, normal),
@ -47,12 +53,43 @@ confirm() ->
{node, Node},
{backend, Backend}
]),
timer:sleep(?TIME_BETWEEN_UPGRADES)
upgrade_recv_loop()
end || {{ok, Sup}, Node} <- Sups],
pass.
upgrade_recv_loop() ->
{_, StartSecs, _} = now(),
lager:info("StartSecs : ~p", [StartSecs]),
EndSecs = StartSecs + ?TIME_BETWEEN_UPGRADES,
upgrade_recv_loop(EndSecs).
upgrade_recv_loop(EndSecs) ->
{_, Now, _} = now(),
case Now > EndSecs of
true ->
lager:info("Done waiting 'cause ~p > ~p", [Now, EndSecs]);
_ ->
receive
{mapred, bad_result} ->
?assert(false);
{kv, not_equal} ->
?assert(false);
{listkeys, not_equal} ->
?assert(false);
Msg ->
lager:info("Received Mesg ~p", [Msg]),
upgrade_recv_loop(EndSecs)
after (EndSecs - Now) * 1000 ->
lager:info("Done waiting 'cause ~p is up", [EndSecs - Now])
end
end.
seed_cluster(_Nodes=[Node1|_]) ->
lager:info("Seeding Cluster"),