riak_test/tests/replication_object_reformat.erl
Doug Rohrer 6665465d62 Updated tests to explicitly set allow_mult and dvv_enabled, as
overriding default_bucket_props in advanced_config without
explicitly setting these returns different values with the fix for
allow_mult turning to true with an app.config file present.

Cherry-picked from 619b24e
2016-03-19 14:55:12 -04:00

245 lines
8.2 KiB
Erlang

-module(replication_object_reformat).
-behavior(riak_test).
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-define(TEST_BUCKET, <<"object-reformat">>).
-define(NUM_KEYS, 1000).
-define(N, 3).
-define(CONF(Retries), [
{riak_core,
[
{ring_creation_size, 8},
{default_bucket_props,
[
{n_val, ?N},
{allow_mult, true},
{dvv_enabled, true}
]}
]
},
{riak_kv,
[
{anti_entropy, {on, []}},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 100}
]
},
{riak_repl,
[
{fullsync_strategy, aae},
{fullsync_on_connect, false},
{fullsync_interval, disabled},
{max_fssource_retries, Retries}
]}
]).
confirm() ->
lager:info("Verifying v0 source to v1 sink, realtime enabled."),
verify_replication(v0, v1, 1, ?NUM_KEYS, true),
lager:info("Verifying v0 source to v1 sink, realtime disabled."),
verify_replication(v0, v1, 1, ?NUM_KEYS, false),
lager:info("Verifying v1 source to v0 sink, realtime enabled."),
verify_replication(v1, v0, 1, ?NUM_KEYS, true),
lager:info("Verifying v1 source to v0 sink, realtime disabled."),
verify_replication(v1, v0, 1, ?NUM_KEYS, false),
pass.
%% @doc Verify replication works between two different versions of the
%% Riak object format.
verify_replication(AVersion, BVersion, Start, End, Realtime) ->
[ANodes, BNodes] = configure_clusters(AVersion, BVersion, Realtime),
Nodes = [ANodes, BNodes],
AFirst = hd(ANodes),
BFirst = hd(BNodes),
lager:info("Get leader of A cluster."),
LeaderA = repl_util:get_leader(AFirst),
%% Before starting writes, initiate a rolling downgrade.
Me = self(),
case Realtime of
true ->
spawn(fun() ->
lager:info("Running kv_reformat to downgrade to v0 on ~p",
[BFirst]),
{_, _, Error1} = rpc:call(BFirst,
riak_kv_reformat,
run,
[v0, [{kill_handoffs, false}]]),
?assertEqual(0, Error1),
lager:info("Waiting for all nodes to see the v0 capability."),
[rt:wait_until_capability(N, {riak_kv, object_format}, v0, v0)
|| N <- BNodes],
lager:info("Allowing downgrade and writes to occurr concurrently."),
Me ! continue,
lager:info("Downgrading node ~p to previous.",
[BFirst]),
rt:upgrade(BFirst, previous),
lager:info("Waiting for riak_kv to start on node ~p.",
[BFirst]),
rt:wait_for_service(BFirst, [riak_kv])
end),
ok;
_ ->
ok
end,
%% Pause and wait for rolling upgrade to begin, if it takes too
%% long, proceed anyway and the test will fail when it attempts
%% to read the keys.
receive
continue ->
ok
after 60000 ->
ok
end,
lager:info("Write keys, assert they are not available yet."),
repl_util:write_to_cluster(AFirst, Start, End, ?TEST_BUCKET, ?N),
case Realtime of
false ->
lager:info("Verify we can not read the keys on the sink."),
repl_util:read_from_cluster(
BFirst, Start, End, ?TEST_BUCKET, ?NUM_KEYS, ?N);
_ ->
ok
end,
lager:info("Verify we can read the keys on the source."),
repl_util:read_from_cluster(AFirst, Start, End, ?TEST_BUCKET, 0, ?N),
%% Wait until the sink cluster is in a steady state before
%% starting fullsync
rt:wait_until_nodes_ready(BNodes),
rt:wait_until_no_pending_changes(BNodes),
rt:wait_until_registered(BFirst, riak_repl2_fs_node_reserver),
repl_util:validate_completed_fullsync(
LeaderA, BFirst, "B", Start, End, ?TEST_BUCKET),
lager:info("Verify we can read the keys on the sink."),
repl_util:read_from_cluster(BFirst, Start, End, ?TEST_BUCKET, 0, ?N),
%% Verify if we downgrade sink, after replication has complete, we
%% can still read the objects.
%%
case {Realtime, BVersion} of
{false, v1} ->
lager:info("Running kv_reformat to downgrade to v0 on ~p",
[BFirst]),
{_, _, Error} = rpc:call(BFirst,
riak_kv_reformat,
run,
[v0, [{kill_handoffs, false}]]),
?assertEqual(0, Error),
lager:info("Waiting for all nodes to see the v0 capability."),
[rt:wait_until_capability(N, {riak_kv, object_format}, v0, v0)
|| N <- BNodes],
lager:info("Downgrading node ~p to previous.",
[BFirst]),
rt:upgrade(BFirst, previous),
lager:info("Waiting for riak_kv to start on node ~p.",
[BFirst]),
rt:wait_for_service(BFirst, riak_kv),
lager:info("Verify we can read from node ~p after downgrade.",
[BFirst]),
repl_util:read_from_cluster(
BFirst, Start, End, ?TEST_BUCKET, 0, ?N),
ok;
_ ->
ok
end,
rt:clean_cluster(lists:flatten(Nodes)).
%% @doc Configure two clusters and set up replication between them,
%% return the node list of each cluster.
configure_clusters(AVersion, BVersion, Realtime) ->
rt:set_advanced_conf(all, ?CONF(infinity)),
Nodes = [ANodes, BNodes] = rt:build_clusters([3, 3]),
rt:wait_for_cluster_service(ANodes, riak_repl),
rt:wait_for_cluster_service(BNodes, riak_repl),
lager:info("ANodes: ~p", [ANodes]),
lager:info("BNodes: ~p", [BNodes]),
lager:info("Updating app config to force ~p on source cluster.",
[AVersion]),
[rt:update_app_config(N, [{riak_kv,
[{object_format, AVersion}]}])
|| N <- ANodes],
lager:info("Updating app config to force ~p on sink cluster.",
[BVersion]),
[rt:update_app_config(N, [{riak_kv,
[{object_format, BVersion}]}])
|| N <- BNodes],
AFirst = hd(ANodes),
BFirst = hd(BNodes),
lager:info("Naming clusters."),
repl_util:name_cluster(AFirst, "A"),
repl_util:name_cluster(BFirst, "B"),
lager:info("Waiting for convergence."),
rt:wait_until_ring_converged(ANodes),
rt:wait_until_ring_converged(BNodes),
lager:info("Waiting for transfers to complete."),
rt:wait_until_transfers_complete(ANodes),
rt:wait_until_transfers_complete(BNodes),
lager:info("Get leaders."),
LeaderA = repl_util:get_leader(AFirst),
lager:info("Connecting cluster A to B"),
{ok, {BIP, BPort}} = rpc:call(BFirst, application, get_env, [riak_core, cluster_mgr]),
repl_util:connect_cluster(LeaderA, BIP, BPort),
?assertEqual(ok, repl_util:wait_for_connection(LeaderA, "B")),
lager:info("Enabling fullsync from A to B"),
repl_util:enable_fullsync(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes),
rt:wait_until_ring_converged(BNodes),
case Realtime of
true ->
lager:info("Enabling realtime from A to B"),
repl_util:enable_realtime(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes),
rt:wait_until_ring_converged(BNodes);
_ ->
ok
end,
lager:info("Wait for capability on source cluster."),
[rt:wait_until_capability(N, {riak_kv, object_format}, AVersion, v0)
|| N <- ANodes],
lager:info("Wait for capability on sink cluster."),
[rt:wait_until_capability(N, {riak_kv, object_format}, BVersion, v0)
|| N <- BNodes],
lager:info("Ensuring connection from cluster A to B"),
repl_util:connect_cluster_by_name(LeaderA, BPort, "B"),
Nodes.