mirror of https://github.com/apache/kafka.git
MINOR : Fix possible flake in ShareConsumerTest:testConsumerCloseInGroupSequential (#19986)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
*What* - There was a possibility of a flake occurring in one of the tests in `ShareConsumerTest` where we are not waiting for the assignment to be received and synced before we start consuming on `poll()`. - There should ideally be a `waitedPoll()` or a dedicated poll which would trigger the reconcilation and updation of the assignment. Scenario : - We are doing 3 `polls` to receive 1500 records. - On debugging, I noticed 1st poll always returned 0 records as this was triggering reconcilation, the test passed locally as 2nd and 3rd polls add up to give us 1500 records almost always (as 500 is a soft limit). The test can fail if these 2 `polls` do not add up 1500. We need a maximum of 3 polls to receive 1500 records with `max.poll.records` set to 500. - Now we have introduced a loop which would keep polling until we receive the expected records and then close. Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
79d2c3c62a
commit
55297967a9
|
@ -1205,22 +1205,25 @@ public class ShareConsumerTest {
|
|||
int consumer1MessageCount = 0;
|
||||
int consumer2MessageCount = 0;
|
||||
|
||||
// Poll three times to receive records. The second poll acknowledges the records
|
||||
// from the first poll, and so on. The third poll's records are not acknowledged
|
||||
// Poll until we receive all the records. The second poll acknowledges the records
|
||||
// from the first poll, and so on.
|
||||
// The last poll's records are not acknowledged
|
||||
// because the consumer is closed, which makes the broker release the records fetched.
|
||||
ConsumerRecords<byte[], byte[]> records1 = shareConsumer1.poll(Duration.ofMillis(5000));
|
||||
consumer1MessageCount += records1.count();
|
||||
int consumer1MessageCountA = records1.count();
|
||||
records1 = shareConsumer1.poll(Duration.ofMillis(5000));
|
||||
consumer1MessageCount += records1.count();
|
||||
int consumer1MessageCountB = records1.count();
|
||||
records1 = shareConsumer1.poll(Duration.ofMillis(5000));
|
||||
int consumer1MessageCountC = records1.count();
|
||||
assertEquals(totalMessages, consumer1MessageCountA + consumer1MessageCountB + consumer1MessageCountC);
|
||||
shareConsumer1.close();
|
||||
|
||||
int maxRetries = 10;
|
||||
int retries = 0;
|
||||
int lastPollRecordCount = 0;
|
||||
while (consumer1MessageCount < totalMessages && retries < maxRetries) {
|
||||
lastPollRecordCount = shareConsumer1.poll(Duration.ofMillis(5000)).count();
|
||||
consumer1MessageCount += lastPollRecordCount;
|
||||
retries++;
|
||||
}
|
||||
assertEquals(totalMessages, consumer1MessageCount);
|
||||
shareConsumer1.close();
|
||||
|
||||
// These records are released after the first consumer closes.
|
||||
consumer1MessageCount -= lastPollRecordCount;
|
||||
|
||||
retries = 0;
|
||||
while (consumer1MessageCount + consumer2MessageCount < totalMessages && retries < maxRetries) {
|
||||
ConsumerRecords<byte[], byte[]> records2 = shareConsumer2.poll(Duration.ofMillis(5000));
|
||||
consumer2MessageCount += records2.count();
|
||||
|
|
Loading…
Reference in New Issue