KAFKA-18619: New consumer topic metadata events should set requireMetadata flag (#18668)

Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
TengYao Chi 2025-01-29 21:36:05 +08:00 committed by GitHub
parent f960e20647
commit 97a228070e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 27 deletions

View File

@ -26,4 +26,9 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) { protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs); super(type, deadlineMs);
} }
@Override
public boolean requireSubscriptionMetadata() {
return true;
}
} }

View File

@ -108,12 +108,12 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
createProducer(configOverrides = prop) createProducer(configOverrides = prop)
else else
producer producer
verifyWithRetry(sendOneRecord(producer2)) verifyWithRetry(sendOneRecord(producer2))()
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) @ValueSource(strings = Array("kraft"))
def testTransactionalProducerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { def testTransactionalProducerWithAuthenticationFailure(quorum: String): Unit = {
val txProducer = createTransactionalProducer() val txProducer = createTransactionalProducer()
verifyAuthenticationException(txProducer.initTransactions()) verifyAuthenticationException(txProducer.initTransactions())
@ -122,7 +122,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer() val consumer = createConsumer()
consumer.subscribe(List(topic).asJava) consumer.subscribe(List(topic).asJava)
@ -130,7 +130,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer() val consumer = createConsumer()
consumer.assign(List(tp).asJava) consumer.assign(List(tp).asJava)
@ -138,7 +138,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
val consumer = createConsumer() val consumer = createConsumer()
@ -153,13 +153,13 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
createClientCredential() createClientCredential()
val producer = createProducer() val producer = createProducer()
verifyWithRetry(sendOneRecord(producer)) verifyWithRetry(sendOneRecord(producer))()
verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count)) verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) @ValueSource(strings = Array("kraft"))
def testKafkaAdminClientWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { def testKafkaAdminClientWithAuthenticationFailure(quorum: String): Unit = {
val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties)) val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props) val adminClient = Admin.create(props)
@ -180,7 +180,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
verifyAuthenticationException(describeTopic()) verifyAuthenticationException(describeTopic())
createClientCredential() createClientCredential()
verifyWithRetry(describeTopic()) verifyWithRetry(describeTopic())()
} finally { } finally {
adminClient.close() adminClient.close()
} }
@ -209,13 +209,12 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs") assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
} }
private def verifyWithRetry(action: => Unit): Unit = { private def verifyWithRetry[T](operation: => T)(predicate: T => Boolean = (_: T) => true): Unit = {
var attempts = 0 var attempts = 0
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
try { try {
attempts += 1 attempts += 1
action predicate(operation)
true
} catch { } catch {
case _: SaslAuthenticationException => false case _: SaslAuthenticationException => false
} }

View File

@ -136,24 +136,26 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name. // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}") @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, String groupProtocol) throws Exception { public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, String groupProtocol) throws Exception {
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); try (
try (Consumer<byte[], byte[]> consumer = createConsumer()) { ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();
Consumer<byte[], byte[]> consumer = createConsumer()
) {
consumer.subscribe(Collections.singletonList(TOPIC)); consumer.subscribe(Collections.singletonList(TOPIC));
verifyAuthenticationException(consumerGroupService::listGroups); verifyAuthenticationException(consumerGroupService::listGroups);
} finally {
consumerGroupService.close();
} }
} }
// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}") @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
public void testConsumerGroupServiceWithAuthenticationSuccess(String quorum, String groupProtocol) throws Exception { public void testConsumerGroupServiceWithAuthenticationSuccess(String quorum, String groupProtocol) throws Exception {
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2); createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); try (
try (Consumer<byte[], byte[]> consumer = createConsumer()) { ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();
Consumer<byte[], byte[]> consumer = createConsumer()
) {
consumer.subscribe(Collections.singletonList(TOPIC)); consumer.subscribe(Collections.singletonList(TOPIC));
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
@ -165,8 +167,6 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
} }
}, "failed to poll data with authentication"); }, "failed to poll data with authentication");
assertEquals(1, consumerGroupService.listConsumerGroups().size()); assertEquals(1, consumerGroupService.listConsumerGroups().size());
} finally {
consumerGroupService.close();
} }
} }