Clean up code, refactor into functions.

This commit is contained in:
Christopher Meiklejohn 2014-01-15 18:49:51 -05:00
parent c0e4415614
commit 12fcf22cb7

View File

@ -5,7 +5,7 @@
-import(rt, [deploy_nodes/2]).
-define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>).
-define(TEST_BUCKET, <<"object-reformat">>).
-define(NUM_KEYS, 1000).
-define(CONF(Retries), [
@ -17,7 +17,6 @@
},
{riak_kv,
[
{object_format, v1},
{anti_entropy, {on, []}},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 100}
@ -43,10 +42,6 @@ confirm() ->
lager:info("Building two clusters."),
[repl_util:make_cluster(N) || N <- [ANodes, BNodes]],
lager:info("Updating app config to force v0 on sink cluster."),
[rt:update_app_config(N, [{riak_kv, [{object_format, v0}]}])
|| N <- BNodes],
AFirst = hd(ANodes),
BFirst = hd(BNodes),
@ -62,12 +57,30 @@ confirm() ->
rt:wait_until_transfers_complete(ANodes),
rt:wait_until_transfers_complete(BNodes),
lager:info("Wait for v1 capability on source cluster."),
[rt:wait_until_capability(N, {riak_kv, object_format}, v1, v0)
verify_replication({ANodes, v0}, {BNodes, v1}, 1).
verify_replication({ANodes, AVersion}, {BNodes, BVersion}, Start) ->
AFirst = hd(ANodes),
BFirst = hd(BNodes),
lager:info("Updating app config to force ~p on source cluster.",
[AVersion]),
[rt:update_app_config(N, [{riak_kv,
[{object_format, AVersion}]}])
|| N <- ANodes],
lager:info("Wait for v0 capability on sink cluster."),
[rt:wait_until_capability(N, {riak_kv, object_format}, v0, v0)
lager:info("Updating app config to force ~p on sink cluster.",
[BVersion]),
[rt:update_app_config(N, [{riak_kv,
[{object_format, BVersion}]}])
|| N <- BNodes],
lager:info("Wait for capability on source cluster."),
[rt:wait_until_capability(N, {riak_kv, object_format}, AVersion, v0)
|| N <- ANodes],
lager:info("Wait for capability on sink cluster."),
[rt:wait_until_capability(N, {riak_kv, object_format}, BVersion, v0)
|| N <- BNodes],
lager:info("Get leaders."),
@ -81,11 +94,20 @@ confirm() ->
rt:repl_connect_cluster(LeaderA, BPort, "B"),
lager:info("Write keys, assert they are not available yet."),
rt:write_to_cluster(AFirst, 1, ?NUM_KEYS, ?TEST_BUCKET),
rt:read_from_cluster(BFirst, 1, ?NUM_KEYS, ?NUM_KEYS, ?TEST_BUCKET),
rt:write_to_cluster(AFirst, Start, ?NUM_KEYS, ?TEST_BUCKET),
rt:read_from_cluster(BFirst, Start, ?NUM_KEYS, ?TEST_BUCKET, ?NUM_KEYS),
%% Flush AAE trees to disk.
perform_sacrifice(AFirst),
lager:info("Enabling fullsync from A to B"),
repl_util:enable_fullsync(LeaderA, "B"),
rt:wait_until_ring_converged(ANodes),
rt:validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS, ?TEST_BUCKET).
rt:validate_completed_fullsync(LeaderA, BFirst, "B", Start, ?NUM_KEYS, ?TEST_BUCKET).
%% @doc Required for 1.4+ Riak, write sacrificial keys to force AAE
%% trees to flush to disk.
perform_sacrifice(Node) ->
?assertEqual([], repl_util:do_write(Node, 1, 2000,
<<"sacrificial">>, 1)).