mirror of https://github.com/apache/kafka.git
KAFKA-17623: Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback (#18515)
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
07c860b730
commit
4f0a91393c
|
@ -35,67 +35,67 @@ class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
|
|||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsAssigned { (consumer, _) =>
|
||||
triggerOnPartitionsAssigned(tp, { (consumer, _) =>
|
||||
val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp)))
|
||||
assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testConsumerRebalanceListenerAssignmentOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsAssigned { (consumer, _) =>
|
||||
triggerOnPartitionsAssigned(tp, { (consumer, _) =>
|
||||
assertTrue(consumer.assignment().contains(tp));
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsAssigned { (consumer, _) =>
|
||||
triggerOnPartitionsAssigned(tp, { (consumer, _) =>
|
||||
val map = consumer.beginningOffsets(Collections.singletonList(tp))
|
||||
assertTrue(map.containsKey(tp))
|
||||
assertEquals(0, map.get(tp))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testConsumerRebalanceListenerAssignOnPartitionsRevoked(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsRevoked { (consumer, _) =>
|
||||
triggerOnPartitionsRevoked(tp, { (consumer, _) =>
|
||||
val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp)))
|
||||
assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testConsumerRebalanceListenerAssignmentOnPartitionsRevoked(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsRevoked { (consumer, _) =>
|
||||
triggerOnPartitionsRevoked(tp, { (consumer, _) =>
|
||||
assertTrue(consumer.assignment().contains(tp))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsRevoked { (consumer, _) =>
|
||||
triggerOnPartitionsRevoked(tp, { (consumer, _) =>
|
||||
val map = consumer.beginningOffsets(Collections.singletonList(tp))
|
||||
assertTrue(map.containsKey(tp))
|
||||
assertEquals(0, map.get(tp))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(quorum: String, groupProtocol: String): Unit = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
triggerOnPartitionsAssigned { (consumer, _) => assertDoesNotThrow(() => consumer.position(tp)) }
|
||||
triggerOnPartitionsAssigned(tp, { (consumer, _) => assertDoesNotThrow(() => consumer.position(tp)) })
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
|
@ -110,7 +110,7 @@ class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
|
|||
val startingTimestamp = 0
|
||||
sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)
|
||||
|
||||
triggerOnPartitionsAssigned(consumer, { (consumer, _) =>
|
||||
triggerOnPartitionsAssigned(tp, consumer, { (consumer, _) =>
|
||||
consumer.seek(tp, startingOffset)
|
||||
consumer.pause(asList(tp))
|
||||
})
|
||||
|
@ -122,16 +122,23 @@ class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
|
|||
startingTimestamp = startingOffset)
|
||||
}
|
||||
|
||||
private def triggerOnPartitionsAssigned(execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
|
||||
private def triggerOnPartitionsAssigned(tp: TopicPartition,
|
||||
execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
|
||||
val consumer = createConsumer()
|
||||
triggerOnPartitionsAssigned(consumer, execute)
|
||||
triggerOnPartitionsAssigned(tp, consumer, execute)
|
||||
}
|
||||
private def triggerOnPartitionsAssigned(consumer: Consumer[Array[Byte], Array[Byte]], execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
|
||||
|
||||
private def triggerOnPartitionsAssigned(tp: TopicPartition,
|
||||
consumer: Consumer[Array[Byte], Array[Byte]],
|
||||
execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
|
||||
val partitionsAssigned = new AtomicBoolean(false)
|
||||
consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
|
||||
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
|
||||
execute(consumer, partitions)
|
||||
partitionsAssigned.set(true)
|
||||
// Make sure the partition used in the test is actually assigned before continuing.
|
||||
if (partitions.contains(tp)) {
|
||||
execute(consumer, partitions)
|
||||
partitionsAssigned.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
|
||||
|
@ -141,18 +148,25 @@ class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
|
|||
TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed out before expected rebalance completed")
|
||||
}
|
||||
|
||||
private def triggerOnPartitionsRevoked(execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
|
||||
private def triggerOnPartitionsRevoked(tp: TopicPartition,
|
||||
execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
|
||||
val consumer = createConsumer()
|
||||
val partitionsAssigned = new AtomicBoolean(false)
|
||||
val partitionsRevoked = new AtomicBoolean(false)
|
||||
consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
|
||||
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
|
||||
partitionsAssigned.set(true)
|
||||
// Make sure the partition used in the test is actually assigned before continuing.
|
||||
if (partitions.contains(tp)) {
|
||||
partitionsAssigned.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
|
||||
execute(consumer, partitions)
|
||||
partitionsRevoked.set(true)
|
||||
// Make sure the partition used in the test is actually revoked before continuing.
|
||||
if (partitions.contains(tp)) {
|
||||
execute(consumer, partitions)
|
||||
partitionsRevoked.set(true)
|
||||
}
|
||||
}
|
||||
})
|
||||
TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed out before expected rebalance completed")
|
||||
|
|
Loading…
Reference in New Issue