MINOR: Cleanup zk condition in TransactionsTest, QuorumTestHarness and PlaintextConsumerAssignorsTest (#18639)

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-01-23 19:53:10 +08:00 committed by GitHub
parent a783dc69b9
commit bdc92fd5a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 12 additions and 36 deletions

View File

@ -308,8 +308,6 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest {
// Only the classic group protocol supports client-side assignors
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.assignmentStrategy={2}")
@CsvSource(Array(
"zk, classic, org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
"zk, classic, org.apache.kafka.clients.consumer.RangeAssignor",
"kraft, classic, org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
"kraft, classic, org.apache.kafka.clients.consumer.RangeAssignor"
))

View File

@ -587,14 +587,9 @@ class TransactionsTest extends IntegrationTestHarness {
fail("Should not be able to send messages from a fenced producer.")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException => {
if (quorum == "zk") {
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
} else {
// In kraft mode, transactionV2 is used.
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}
}
case e: ExecutionException =>
// In kraft mode, transactionV2 is used.
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
case e: Exception =>
throw new AssertionError("Got an unexpected exception from a fenced producer.", e)
}
@ -622,27 +617,14 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)
if (quorum == "zk") {
// In zk mode, transaction v1 is used.
try {
// Now that the transaction has expired, the second send should fail with a ProducerFencedException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a ProducerFencedException since the transaction has expired")
} catch {
case _: ProducerFencedException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
}
} else {
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}
// Verify that the first message was aborted and the second one was never written at all.

View File

@ -270,7 +270,6 @@ abstract class QuorumTestHarness extends Logging {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0,$listeners")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, s"CONTROLLER,$listenerNames")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0")
// Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent.
props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")
val config = new KafkaConfig(props)
@ -369,7 +368,7 @@ object QuorumTestHarness {
/**
* Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread.
* This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
* This assumes that brokers, admin clients, producers and consumers are not created in another @BeforeClass,
* which is true for core tests where this harness is used.
*/
@BeforeAll
@ -437,9 +436,6 @@ object QuorumTestHarness {
)
}
// The following is for tests that only work with the classic group protocol because of relying on Zookeeper
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly