mirror of
https://github.com/valitydev/riak_test.git
synced 2024-11-07 00:55:21 +00:00
Merge pull request #384 from basho/feature/mw-reduced-repl
Test for reduced replication
This commit is contained in:
commit
fe2a602a6b
256
tests/repl_reduced.erl
Normal file
256
tests/repl_reduced.erl
Normal file
@ -0,0 +1,256 @@
|
||||
-module(repl_reduced).
|
||||
|
||||
-behavior(riak_test).
|
||||
|
||||
-compile([export_all]).
|
||||
|
||||
-export([confirm/0]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
confirm() ->
|
||||
case eunit:test(?MODULE, [verbose]) of
|
||||
ok ->
|
||||
pass;
|
||||
error ->
|
||||
exit(error),
|
||||
fail
|
||||
end.
|
||||
|
||||
toggle_enabled_test_() ->
|
||||
{setup, fun() ->
|
||||
Nodes = rt:deploy_nodes(3, conf()),
|
||||
repl_util:make_cluster(Nodes),
|
||||
Nodes
|
||||
end,
|
||||
fun(Nodes) ->
|
||||
rt:clean_cluster(Nodes)
|
||||
end,
|
||||
fun(Nodes) -> [
|
||||
|
||||
{"check default setting", fun() ->
|
||||
[Head | _] = Nodes,
|
||||
Got = rpc:call(Head, riak_repl_console, full_objects, [[]]),
|
||||
?assertEqual(always, Got)
|
||||
end},
|
||||
|
||||
{"set to always have full objects", fun() ->
|
||||
[Head | _] = Nodes,
|
||||
Got = rpc:call(Head, riak_repl_console, full_objects, [["always"]]),
|
||||
?assertEqual(always, Got),
|
||||
Got2 = rpc:call(Head, riak_repl_console, full_objects, [[]]),
|
||||
?assertEqual(always, Got2)
|
||||
end},
|
||||
|
||||
{"set to never have full objects", fun() ->
|
||||
[Head | _] = Nodes,
|
||||
Got = rpc:call(Head, riak_repl_console, full_objects, [["never"]]),
|
||||
?assertEqual(never, Got),
|
||||
Got2 = rpc:call(Head, riak_repl_console, full_objects, [[]]),
|
||||
?assertEqual(never, Got2)
|
||||
end},
|
||||
|
||||
{"set to only keep M full objects", fun() ->
|
||||
[Head | _] = Nodes,
|
||||
Got = rpc:call(Head, riak_repl_console, full_objects, [["3"]]),
|
||||
?assertEqual(3, Got),
|
||||
Got2 = rpc:call(Head, riak_repl_console, full_objects, [[]]),
|
||||
?assertEqual(3, Got2)
|
||||
end}
|
||||
|
||||
] end}.
|
||||
|
||||
-record(data_push_test, {
|
||||
nodes,
|
||||
c123,
|
||||
c456
|
||||
}).
|
||||
|
||||
data_push_test_() ->
|
||||
{timeout, rt_cascading:timeout(1000000000000000), {setup, fun() ->
|
||||
Nodes = rt:deploy_nodes(6, conf()),
|
||||
{[N1 | _] = C123, [N4 | _] = C456} = lists:split(3, Nodes),
|
||||
repl_util:make_cluster(C123),
|
||||
repl_util:name_cluster(N1, "c123"),
|
||||
repl_util:make_cluster(C456),
|
||||
repl_util:name_cluster(N4, "c456"),
|
||||
Port = repl_util:get_cluster_mgr_port(N4),
|
||||
lager:info("attempting to connect ~p to c456 on port ~p", [N1, Port]),
|
||||
repl_util:connect_rt(N1, Port, "c456"),
|
||||
#data_push_test{nodes = Nodes, c123 = C123, c456 = C456}
|
||||
end,
|
||||
fun(State) ->
|
||||
rt:clean_cluster(State#data_push_test.nodes)
|
||||
end,
|
||||
fun(State) -> [
|
||||
|
||||
{"repl works", timeout, rt_cascading:timeout(1000), fun() ->
|
||||
#data_push_test{c123 = [N1 | _]} = State,
|
||||
Client123 = rt:pbc(N1),
|
||||
Bin = <<"data data data">>,
|
||||
Key = <<"derkey">>,
|
||||
Bucket = <<"kicked">>,
|
||||
Obj = riakc_obj:new(Bucket, Key, Bin),
|
||||
riakc_pb_socket:put(Client123, Obj, [{w,3}]),
|
||||
riakc_pb_socket:stop(Client123),
|
||||
Got = [maybe_eventually_exists(Node, Bucket, Key) || Node <- State#data_push_test.c456],
|
||||
?assertMatch([{ok, _Obj}, {ok, _Obj}, {ok, _Obj}], Got)
|
||||
end},
|
||||
|
||||
{"common case: 1 real with 2 reduced", timeout, rt_cascading:timeout(1000), fun() ->
|
||||
#data_push_test{c123 = [N1 | _], c456 = [N4 | _]} = State,
|
||||
lager:info("setting full objects to 1"),
|
||||
rpc:call(N4, riak_repl_console, full_objects, [["1"]]),
|
||||
WaitFun = fun(Node) ->
|
||||
case rpc:call(Node, riak_repl_console, full_objects, [[]]) of
|
||||
1 ->
|
||||
true;
|
||||
Uh ->
|
||||
lager:info("~p got ~p", [Node, Uh]),
|
||||
false
|
||||
end
|
||||
end,
|
||||
[rt:wait_until(Node, WaitFun) || Node <- State#data_push_test.c456],
|
||||
lager:info("putting an object on ~p", [N1]),
|
||||
Client123 = rt:pbc(N1),
|
||||
Bin = <<"before repl reduction, this is a binary">>,
|
||||
Key = <<"the key">>,
|
||||
Bucket = <<"objects">>,
|
||||
Obj = riakc_obj:new(Bucket, Key, Bin),
|
||||
riakc_pb_socket:put(Client123, Obj, [{w,3}]),
|
||||
riakc_pb_socket:stop(Client123),
|
||||
lager:info("Checking object on sink cluster"),
|
||||
Got = lists:map(fun(Node) ->
|
||||
lager:info("maybe eventualliy exists on ~p", [Node]),
|
||||
maybe_eventually_exists(Node, Bucket, Key)
|
||||
end, State#data_push_test.c456),
|
||||
?assertMatch([{ok, _}, {ok, _}, {ok, _}], Got),
|
||||
lists:map(fun({ok, GotObj}) ->
|
||||
Value = riakc_obj:get_value(GotObj),
|
||||
?assertEqual(Bin, Value)
|
||||
end, Got)
|
||||
end},
|
||||
|
||||
{"only carry reduced objects, no proxy provider though", timeout, rt_cascading:timeout(1000), fun() ->
|
||||
#data_push_test{c123 = [N1 | _], c456 = [N4 | _]} = State,
|
||||
lager:info("setting full objects to never"),
|
||||
rpc:call(N4, riak_repl_console, full_objects, [["never"]]),
|
||||
WaitFun = fun(Node) ->
|
||||
Got = rpc:call(Node, riak_repl_console, full_objects, [[]]),
|
||||
Got =:= never
|
||||
end,
|
||||
[rt:wait_until(Node, WaitFun) || Node <- State#data_push_test.c456],
|
||||
Client123 = rt:pbc(N1),
|
||||
Bin = <<"only carry reduced objects">>,
|
||||
Key = <<"ocro">>,
|
||||
Bucket = <<"objects">>,
|
||||
Obj = riakc_obj:new(Bucket, Key, Bin),
|
||||
riakc_pb_socket:put(Client123, Obj, [{w,3}]),
|
||||
riakc_pb_socket:stop(Client123),
|
||||
Got = lists:map(fun(Node) ->
|
||||
maybe_eventually_exists(Node, Bucket, Key)
|
||||
end, State#data_push_test.c456),
|
||||
?assertMatch([{error, notfound}, {error, notfound}, {error, notfound}], Got)
|
||||
end},
|
||||
|
||||
{"only carry reduced objects, has a proxy provider", timeout, rt_cascading:timeout(1000), fun() ->
|
||||
#data_push_test{c123 = [N1 | _], c456 = [N4 | _]} = State,
|
||||
lager:info("setting full objects to never"),
|
||||
rpc:call(N4, riak_repl_console, full_objects, [["never"]]),
|
||||
lager:info("setting proxy provider at c123 for c456"),
|
||||
rpc:call(N1, riak_repl_console, proxy_get, [["enable", "c456"]]),
|
||||
WaitFun = fun(Node) ->
|
||||
Got = rpc:call(Node, riak_repl_console, full_objects, [[]]),
|
||||
Got =:= never
|
||||
end,
|
||||
[rt:wait_until(Node, WaitFun) || Node <- State#data_push_test.c456],
|
||||
Client123 = rt:pbc(N1),
|
||||
Bin = <<"only carry reduced objects">>,
|
||||
Key = <<"ocro">>,
|
||||
Bucket = <<"objects">>,
|
||||
Obj = riakc_obj:new(Bucket, Key, Bin),
|
||||
riakc_pb_socket:put(Client123, Obj, [{w,3}]),
|
||||
riakc_pb_socket:stop(Client123),
|
||||
Got = lists:map(fun(Node) ->
|
||||
maybe_eventually_exists(Node, Bucket, Key)
|
||||
end, State#data_push_test.c456),
|
||||
?assertMatch([{ok, _}, {ok, _}, {ok, _}], Got),
|
||||
lists:map(fun({ok, GotObj}) ->
|
||||
Value = riakc_obj:get_value(GotObj),
|
||||
?assertEqual(Bin, Value)
|
||||
end, Got)
|
||||
end}
|
||||
|
||||
|
||||
|
||||
] end}}.
|
||||
|
||||
conf() ->
|
||||
[{lager, [
|
||||
{handlers, [
|
||||
{lager_console_backend,info},
|
||||
{lager_file_backend, [
|
||||
{"./log/error.log",error,10485760,"$D0",5},
|
||||
{"./log/console.log",info,10485760,"$D0",5},
|
||||
{"./log/debug.log",debug,10485760,"$D0",5}
|
||||
]}
|
||||
]},
|
||||
{crash_log,"./log/crash.log"},
|
||||
{crash_log_msg_size,65536},
|
||||
{crash_log_size,10485760},
|
||||
{crash_log_date,"$D0"},
|
||||
{crash_log_count,5},
|
||||
{error_logger_redirect,true}
|
||||
]},
|
||||
{riak_repl, [
|
||||
{fullsync_on_connect, false},
|
||||
{fullsync_interval, disabled},
|
||||
{diff_batch_size, 10}
|
||||
]}].
|
||||
|
||||
exists(Nodes, Bucket, Key) ->
|
||||
exists({error, notfound}, Nodes, Bucket, Key).
|
||||
|
||||
exists(Got, [], _Bucket, _Key) ->
|
||||
Got;
|
||||
exists({error, notfound}, [Node | Tail], Bucket, Key) ->
|
||||
Pid = rt:pbc(Node),
|
||||
Got = riakc_pb_socket:get(Pid, Bucket, Key, [{pr, 1}]),
|
||||
riakc_pb_socket:stop(Pid),
|
||||
exists(Got, Tail, Bucket, Key);
|
||||
exists(Got, _Nodes, _Bucket, _Key) ->
|
||||
Got.
|
||||
|
||||
maybe_eventually_exists(Node, Bucket, Key) ->
|
||||
Timeout = rt_cascading:timeout(10),
|
||||
WaitTime = rt_config:get(default_wait_time, 1000),
|
||||
maybe_eventually_exists(Node, Bucket, Key, Timeout, WaitTime).
|
||||
|
||||
%maybe_eventually_exists(Node, Bucket, Key, Timeout) ->
|
||||
% WaitTime = rt_config:get(default_wait_time, 1000),
|
||||
% maybe_eventually_exists(Node, Bucket, Key, Timeout, WaitTime).
|
||||
|
||||
maybe_eventually_exists(Node, Bucket, Key, Timeout, WaitMs) when is_atom(Node) ->
|
||||
maybe_eventually_exists([Node], Bucket, Key, Timeout, WaitMs);
|
||||
|
||||
maybe_eventually_exists(Nodes, Bucket, Key, Timeout, WaitMs) ->
|
||||
Got = exists(Nodes, Bucket, Key),
|
||||
maybe_eventually_exists(Got, Nodes, Bucket, Key, Timeout, WaitMs).
|
||||
|
||||
maybe_eventually_exists({error, notfound}, Nodes, Bucket, Key, Timeout, WaitMs) when Timeout > 0 ->
|
||||
?debugMsg("not found, waiting again"),
|
||||
timer:sleep(WaitMs),
|
||||
Got = exists(Nodes, Bucket, Key),
|
||||
Timeout2 = case Timeout of
|
||||
infinity ->
|
||||
infinity;
|
||||
_ ->
|
||||
Timeout - WaitMs
|
||||
end,
|
||||
maybe_eventually_exists(Got, Nodes, Bucket, Key, Timeout2, WaitMs);
|
||||
|
||||
maybe_eventually_exists({ok, _RiakObj} = Out, _Nodes, _Bucket, _Key, _Timeout, _WaitMs) ->
|
||||
Out;
|
||||
|
||||
maybe_eventually_exists(Got, _Nodes, _Bucket, _Key, _Timeout, _WaitMs) ->
|
||||
Got.
|
@ -25,7 +25,10 @@
|
||||
stop_realtime/2,
|
||||
do_write/5,
|
||||
get_fs_coord_status_item/3,
|
||||
num_partitions/1
|
||||
num_partitions/1,
|
||||
get_cluster_mgr_port/1,
|
||||
maybe_reconnect_rt/3,
|
||||
connect_rt/3
|
||||
]).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
@ -284,3 +287,21 @@ num_partitions(Node) ->
|
||||
{ok, Ring} = rpc:call(Node, riak_core_ring_manager, get_raw_ring, []),
|
||||
N = riak_core_ring:num_partitions(Ring),
|
||||
N.
|
||||
|
||||
get_cluster_mgr_port(Node) ->
|
||||
{ok, {_Ip, Port}} = rpc:call(Node, application, get_env, [riak_core, cluster_mgr]),
|
||||
Port.
|
||||
|
||||
maybe_reconnect_rt(SourceNode, SinkPort, SinkName) ->
|
||||
case repl_util:wait_for_connection(SourceNode, SinkName) of
|
||||
fail ->
|
||||
connect_rt(SourceNode, SinkPort, SinkName);
|
||||
Oot ->
|
||||
Oot
|
||||
end.
|
||||
|
||||
connect_rt(SourceNode, SinkPort, SinkName) ->
|
||||
repl_util:connect_cluster(SourceNode, "127.0.0.1", SinkPort),
|
||||
repl_util:wait_for_connection(SourceNode, SinkName),
|
||||
repl_util:enable_realtime(SourceNode, SinkName),
|
||||
repl_util:start_realtime(SourceNode, SinkName).
|
||||
|
Loading…
Reference in New Issue
Block a user