Use rt:wait_until to check unacked on source node

Acks are sent asynchronously after a successful write on the sink, so we
cannot rely on the ack having being received by the source immediately
after we read the data on the sink.
This commit is contained in:
Ian Milligan 2016-11-10 14:30:22 -08:00
parent cc6fc9f6c4
commit dc97aa421d

View File

@ -52,15 +52,17 @@ ensure_ack_tests(Nodes) ->
%% verify data is replicated to B %% verify data is replicated to B
lager:info("Reading 1 key written from ~p", [LeaderB]), lager:info("Reading 1 key written from ~p", [LeaderB]),
?assertEqual(0, repl_util:wait_for_reads(LeaderB, 1, 1, TestBucket, 2)), ?assertEqual(0, repl_util:wait_for_reads(LeaderB, 1, 1, TestBucket, 2)),
lager:info("Checking unacked on ~p", [LeaderA]),
?assertEqual(ok, rt:wait_until(fun () -> check_unacked(LeaderA) end)).
check_unacked(LeaderA) ->
RTQStatus = rpc:call(LeaderA, riak_repl2_rtq, status, []), RTQStatus = rpc:call(LeaderA, riak_repl2_rtq, status, []),
Consumers = proplists:get_value(consumers, RTQStatus), Consumers = proplists:get_value(consumers, RTQStatus),
case proplists:get_value("B", Consumers) of case proplists:get_value("B", Consumers) of
undefined -> undefined ->
[]; missing_consumer;
Consumer -> Consumer ->
Unacked = proplists:get_value(unacked, Consumer, 0), Unacked = proplists:get_value(unacked, Consumer, 0),
lager:info("unacked: ~p", [Unacked]), lager:info("unacked: ~p", [Unacked]),
?assertEqual(0, Unacked) Unacked == 0
end. end.