diff --git a/tests/replication_object_reformat.erl b/tests/replication_object_reformat.erl index 363501a8..94a91d7c 100644 --- a/tests/replication_object_reformat.erl +++ b/tests/replication_object_reformat.erl @@ -5,7 +5,7 @@ -import(rt, [deploy_nodes/2]). --define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>). +-define(TEST_BUCKET, <<"object-reformat">>). -define(NUM_KEYS, 1000). -define(CONF(Retries), [ @@ -17,7 +17,6 @@ }, {riak_kv, [ - {object_format, v1}, {anti_entropy, {on, []}}, {anti_entropy_build_limit, {100, 1000}}, {anti_entropy_concurrency, 100} @@ -43,10 +42,6 @@ confirm() -> lager:info("Building two clusters."), [repl_util:make_cluster(N) || N <- [ANodes, BNodes]], - lager:info("Updating app config to force v0 on sink cluster."), - [rt:update_app_config(N, [{riak_kv, [{object_format, v0}]}]) - || N <- BNodes], - AFirst = hd(ANodes), BFirst = hd(BNodes), @@ -62,12 +57,30 @@ confirm() -> rt:wait_until_transfers_complete(ANodes), rt:wait_until_transfers_complete(BNodes), - lager:info("Wait for v1 capability on source cluster."), - [rt:wait_until_capability(N, {riak_kv, object_format}, v1, v0) + verify_replication({ANodes, v0}, {BNodes, v1}, 1). + +verify_replication({ANodes, AVersion}, {BNodes, BVersion}, Start) -> + AFirst = hd(ANodes), + BFirst = hd(BNodes), + + lager:info("Updating app config to force ~p on source cluster.", + [AVersion]), + [rt:update_app_config(N, [{riak_kv, + [{object_format, AVersion}]}]) || N <- ANodes], - lager:info("Wait for v0 capability on sink cluster."), - [rt:wait_until_capability(N, {riak_kv, object_format}, v0, v0) + lager:info("Updating app config to force ~p on sink cluster.", + [BVersion]), + [rt:update_app_config(N, [{riak_kv, + [{object_format, BVersion}]}]) + || N <- BNodes], + + lager:info("Wait for capability on source cluster."), + [rt:wait_until_capability(N, {riak_kv, object_format}, AVersion, v0) + || N <- ANodes], + + lager:info("Wait for capability on sink cluster."), + [rt:wait_until_capability(N, {riak_kv, object_format}, BVersion, v0) || N <- BNodes], lager:info("Get leaders."), @@ -81,11 +94,20 @@ confirm() -> rt:repl_connect_cluster(LeaderA, BPort, "B"), lager:info("Write keys, assert they are not available yet."), - rt:write_to_cluster(AFirst, 1, ?NUM_KEYS, ?TEST_BUCKET), - rt:read_from_cluster(BFirst, 1, ?NUM_KEYS, ?NUM_KEYS, ?TEST_BUCKET), + rt:write_to_cluster(AFirst, Start, ?NUM_KEYS, ?TEST_BUCKET), + rt:read_from_cluster(BFirst, Start, ?NUM_KEYS, ?TEST_BUCKET, ?NUM_KEYS), + + %% Flush AAE trees to disk. + perform_sacrifice(AFirst), lager:info("Enabling fullsync from A to B"), repl_util:enable_fullsync(LeaderA, "B"), rt:wait_until_ring_converged(ANodes), - rt:validate_completed_fullsync(LeaderA, BFirst, "B", 1, ?NUM_KEYS, ?TEST_BUCKET). + rt:validate_completed_fullsync(LeaderA, BFirst, "B", Start, ?NUM_KEYS, ?TEST_BUCKET). + +%% @doc Required for 1.4+ Riak, write sacrificial keys to force AAE +%% trees to flush to disk. +perform_sacrifice(Node) -> + ?assertEqual([], repl_util:do_write(Node, 1, 2000, + <<"sacrificial">>, 1)).