MINOR: Cleanup admin creation logic in integration tests (#11790)

There seemed to be a little sloppiness in the integration tests in regard to admin client creation. Not only was there duplicated logic, but it wasn't always clear which listener the admin client was targeting. This made it difficult to tell in the context of authorization tests whether we were indeed testing with the right principal. As an example, we had a method in TestUtils which was using the inter-broker listener implicitly. This meant that the test was using the broker principal which had super user privilege. This was intentional, but I think it would be clearer to make the dependence on this listener explicit. This patch attempts to clean this up a bit by consolidating some of the admin creation logic and making the reliance on the listener clearer.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
This commit is contained in:
Jason Gustafson 2022-02-24 07:37:28 -08:00 committed by GitHub
parent 8d88b20b27
commit 711b603ddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
70 changed files with 442 additions and 446 deletions

View File

@ -203,8 +203,8 @@ class BrokerMetadataListener(
var index = 0
batch.records().forEach { messageAndVersion =>
if (isTraceEnabled) {
trace("Metadata batch %d: processing [%d/%d]: %s.".format(batch.lastOffset, index + 1,
batch.records().size(), messageAndVersion.message().toString()))
trace(s"Metadata batch ${batch.lastOffset}: processing [${index + 1}/${batch.records.size}]:" +
s" ${messageAndVersion.message}")
}
_highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)

View File

@ -254,7 +254,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Admin createAdminClient(Properties configOverrides) {
return clusterReference.get().createAdminClient(configOverrides);
return clusterReference.get().createAdminClient(clientListener(), configOverrides);
}
@Override

View File

@ -49,11 +49,11 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
def checkBrokerApiVersionCommandOutput(): Unit = {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)
BrokerApiVersionsCommand.execute(Array("--bootstrap-server", bootstrapServers()), printStream)
val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val lineIter = content.split("\n").iterator
assertTrue(lineIter.hasNext)
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", lineIter.next())
val nodeApiVersions = NodeApiVersions.create
val enabledApis = ApiKeys.zkBrokerApis.asScala
for (apiKey <- enabledApis) {

View File

@ -41,7 +41,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
createTopic(topicName, 1, 1.toShort)
produceMessages()
adminClient = Admin.create(Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
).asJava)
}

View File

@ -28,8 +28,6 @@ import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@ -620,8 +618,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
def createTopics(): Unit = {
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
brokerList = TestUtils.bootstrapServers(servers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
brokerList = TestUtils.plaintextBootstrapServers(servers)
adminClient = Admin.create(Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
).asJava)

View File

@ -56,7 +56,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
}
def createConfig: util.Map[String, Object] =
Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList).asJava
Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()).asJava
override def generateConfigs = {
val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)

View File

@ -26,7 +26,7 @@ import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
@ -70,7 +70,6 @@ import java.util.Collections.singletonList
import scala.annotation.nowarn
import scala.collection.mutable
import scala.collection.mutable.Buffer
import scala.jdk.CollectionConverters._
object AuthorizerIntegrationTest {
@ -144,7 +143,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)))
val numRecords = 1
val adminClients = Buffer[Admin]()
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
@ -342,16 +340,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// Allow inter-broker communication
addAndVerifyAcls(Set(new AccessControlEntry(brokerPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource)
if (isKRaftTest()) {
TestUtils.createOffsetsTopicWithAdmin(brokers)
} else {
TestUtils.createOffsetsTopic(zkClient, servers)
}
createOffsetsTopic(listenerName = interBrokerListenerName)
}
@AfterEach
override def tearDown(): Unit = {
adminClients.foreach(_.close())
removeAllClientAcls()
super.tearDown()
}
@ -840,7 +833,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@CsvSource(value = Array("zk,false", "zk,true", "kraft,false", "kraft,true"))
def testTopicIdAuthorization(quorum: String, withTopicExisting: Boolean): Unit = {
val topicId = if (withTopicExisting) {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
getTopicIds()(topic)
} else {
Uuid.randomUuid()
@ -917,7 +910,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testFetchFollowerRequest(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val request = createFetchFollowerRequest
@ -937,7 +930,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val data = new IncrementalAlterConfigsRequestData
val alterableConfig = new AlterableConfig().setName("kafka.controller.KafkaController").
@ -961,7 +954,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testOffsetsForLeaderEpochClusterPermission(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val request = offsetsForLeaderEpochRequest
@ -980,7 +973,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testProduceWithNoTopicAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val producer = createProducer()
assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
}
@ -988,7 +981,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testProduceWithTopicDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val producer = createProducer()
assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
@ -997,7 +990,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testProduceWithTopicRead(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
val producer = createProducer()
assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
@ -1006,7 +999,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testProduceWithTopicWrite(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
sendRecords(producer, numRecords, tp)
@ -1040,7 +1033,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testConsumeUsingAssignWithNoAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1055,7 +1048,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1075,7 +1068,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1095,7 +1088,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testConsumeWithoutTopicDescribeAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1113,7 +1106,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testConsumeWithTopicDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1132,7 +1125,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testConsumeWithTopicWrite(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1151,7 +1144,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testConsumeWithTopicAndGroupRead(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1170,7 +1163,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionWithNoTopicAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1188,7 +1181,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1207,7 +1200,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionWithTopicAndGroupRead(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1215,7 +1208,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// create an unmatched topic
val unmatchedTopic = "unmatched"
createTopic(unmatchedTopic)
createTopicWithBrokerPrincipal(unmatchedTopic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), new ResourcePattern(TOPIC, unmatchedTopic, LITERAL))
sendRecords(producer, 1, new TopicPartition(unmatchedTopic, part))
removeAllClientAcls()
@ -1240,7 +1233,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionMatchingInternalTopic(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1270,7 +1263,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1296,7 +1289,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionNotMatchingInternalTopic(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = createProducer()
@ -1393,7 +1386,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCommitWithTopicWrite(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
@ -1404,7 +1397,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCommitWithTopicDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
@ -1423,7 +1416,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCommitWithTopicAndGroupRead(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
val consumer = createConsumer()
@ -1441,7 +1434,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testOffsetFetchWithNoGroupAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
@ -1460,7 +1453,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testOffsetFetchAllTopicPartitionsAuthorization(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val offset = 15L
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
@ -1518,9 +1511,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
groupToPartitionMap.put(groups(3), null)
groupToPartitionMap.put(groups(4), null)
createTopic(topics(0))
createTopic(topics(1), numPartitions = 2)
createTopic(topics(2), numPartitions = 3)
createTopicWithBrokerPrincipal(topics(0))
createTopicWithBrokerPrincipal(topics(1), numPartitions = 2)
createTopicWithBrokerPrincipal(topics(2), numPartitions = 3)
groupResources.foreach(r => {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), r)
})
@ -1648,7 +1641,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testOffsetFetchTopicDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val consumer = createConsumer()
@ -1659,7 +1652,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testOffsetFetchWithTopicAndGroupRead(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
val consumer = createConsumer()
@ -1677,7 +1670,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testMetadataWithTopicDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val consumer = createConsumer()
consumer.partitionsFor(topic)
@ -1693,7 +1686,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testListOffsetsWithTopicDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val consumer = createConsumer()
consumer.endOffsets(Set(tp).asJava)
@ -1710,7 +1703,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeGroupApiWithGroupDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get()
@ -1719,11 +1712,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeGroupCliWithGroupDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupService = new ConsumerGroupService(opts)
consumerGroupService.describeGroups()
@ -1733,7 +1726,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testListGroupApiWithAndWithoutListGroupAcls(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
// write some record to the topic
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
@ -1782,7 +1775,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteGroupApiWithDeleteGroupAcl(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
@ -1796,7 +1789,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteGroupApiWithNoDeleteGroupAcl(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
@ -1817,7 +1810,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteGroupOffsetsWithAcl(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
@ -1833,7 +1826,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteGroupOffsetsWithoutDeleteAcl(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
@ -1848,7 +1841,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteGroupOffsetsWithDeleteAclWithoutTopicAcl(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
// Create the consumer group
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
@ -1883,7 +1876,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testUnauthorizedDeleteTopicsWithDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val deleteResponse = connectAndReceive[DeleteTopicsResponse](deleteTopicsRequest)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(topic).errorCode)
@ -1892,7 +1885,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteTopicsWithWildCardAuth(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), new ResourcePattern(TOPIC, "*", LITERAL))
val deleteResponse = connectAndReceive[DeleteTopicsResponse](deleteTopicsRequest)
assertEquals(Errors.NONE.code, deleteResponse.data.responses.find(topic).errorCode)
@ -1909,7 +1902,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testUnauthorizedDeleteRecordsWithDescribe(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
val deleteRecordsResponse = connectAndReceive[DeleteRecordsResponse](deleteRecordsRequest)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteRecordsResponse.data.topics.asScala.head.
@ -1919,7 +1912,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteRecordsWithWildCardAuth(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), new ResourcePattern(TOPIC, "*", LITERAL))
val deleteRecordsResponse = connectAndReceive[DeleteRecordsResponse](deleteRecordsRequest)
assertEquals(Errors.NONE.code, deleteRecordsResponse.data.topics.asScala.head.
@ -1936,7 +1929,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreatePartitionsWithWildCardAuth(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW)), new ResourcePattern(TOPIC, "*", LITERAL))
val createPartitionsResponse = connectAndReceive[CreatePartitionsResponse](createPartitionsRequest)
assertEquals(Errors.NONE.code, createPartitionsResponse.data.results.asScala.head.errorCode)
@ -1960,7 +1953,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsWithNoConsumerGroupDescribeAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
@ -1976,7 +1969,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsWithNoConsumerGroupWriteAccess(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource)
@ -1991,7 +1984,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource)
assertIdempotentSendAuthorizationFailure()
}
@ -2030,7 +2023,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIdempotentProducerNoIdempotentWriteAclInProduce(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, IDEMPOTENT_WRITE, ALLOW)), clusterResource)
idempotentProducerShouldFailInProduce(() => removeAllClientAcls())
@ -2067,7 +2060,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
// add describe access so that we can fetch metadata
@ -2084,7 +2077,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testTransactionalProducerTopicAuthorizationExceptionInCommit(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
// add describe access so that we can fetch metadata
@ -2102,7 +2095,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
val producer = buildTransactionalProducer()
@ -2117,7 +2110,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
@ -2132,7 +2125,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testListTransactionsAuthorization(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
@ -2166,7 +2159,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def shouldNotIncludeUnauthorizedTopicsInDescribeTransactionsResponse(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
@ -2189,7 +2182,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)),
@ -2225,7 +2218,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, IDEMPOTENT_WRITE, ALLOW)), clusterResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val producer = buildIdempotentProducer()
@ -2245,7 +2238,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizeByResourceTypeMultipleAddAndRemove(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
for (_ <- 1 to 3) {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
@ -2263,9 +2256,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(quorum: String): Unit = {
createTopic(topic)
createTopic("topic-2")
createTopic("to")
createTopicWithBrokerPrincipal(topic)
createTopicWithBrokerPrincipal("topic-2")
createTopicWithBrokerPrincipal("to")
val unrelatedPrincipalString = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "unrelated").toString
val unrelatedTopicResource = new ResourcePattern(TOPIC, "topic-2", LITERAL)
@ -2286,7 +2279,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizeByResourceTypeDenyTakesPrecedence(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)
addAndVerifyAcls(Set(allowWriteAce), topicResource)
assertIdempotentSendSuccess()
@ -2299,7 +2292,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizeByResourceTypeWildcardResourceDenyDominate(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val wildcard = new ResourcePattern(TOPIC, ResourcePattern.WILDCARD_RESOURCE, LITERAL)
val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED)
val literal = new ResourcePattern(TOPIC, topic, LITERAL)
@ -2317,7 +2310,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizeByResourceTypePrefixedResourceDenyDominate(quorum: String): Unit = {
createTopic(topic)
createTopicWithBrokerPrincipal(topic)
val prefixed = new ResourcePattern(TOPIC, topic.substring(0, 1), PREFIXED)
val literal = new ResourcePattern(TOPIC, topic, LITERAL)
val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)
@ -2528,10 +2521,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createAdminClient(): Admin = {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val adminClient = Admin.create(props)
adminClients += adminClient
adminClient
createAdminClient(listenerName)
}
private def createTopicWithBrokerPrincipal(
topic: String,
numPartitions: Int = 1
): Unit = {
createTopic(
topic,
numPartitions = numPartitions,
listenerName = interBrokerListenerName
)
}
}

View File

@ -203,7 +203,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
def createConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
val securityProps: util.Map[Object, Object] =
adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)

View File

@ -29,6 +29,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, TopicPartition}
@ -58,7 +59,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT)
consumer = TestUtils.createConsumer(
bootstrapServers(listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
securityProtocol = SecurityProtocol.PLAINTEXT
)
}
@AfterEach
@ -70,14 +74,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
super.tearDown()
}
protected def createProducer(brokerList: String,
lingerMs: Int = 0,
protected def createProducer(lingerMs: Int = 0,
deliveryTimeoutMs: Int = 2 * 60 * 1000,
batchSize: Int = 16384,
compressionType: String = "none",
maxBlockMs: Long = 60 * 1000L,
bufferSize: Long = 1024L * 1024L): KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createProducer(brokerList,
val producer = TestUtils.createProducer(
bootstrapServers(),
compressionType = compressionType,
securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
@ -103,7 +107,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testSendOffset(): Unit = {
val producer = createProducer(brokerList)
val producer = createProducer()
val partition = 0
object callback extends Callback {
@ -164,7 +168,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@Test
def testSendCompressedMessageWithCreateTime(): Unit = {
val producer = createProducer(brokerList = brokerList,
val producer = createProducer(
compressionType = "gzip",
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue)
@ -173,7 +177,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@Test
def testSendNonCompressedMessageWithCreateTime(): Unit = {
val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@ -265,7 +269,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testClose(): Unit = {
val producer = createProducer(brokerList)
val producer = createProducer()
try {
// create topic
@ -298,7 +302,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testSendToPartition(): Unit = {
val producer = createProducer(brokerList)
val producer = createProducer()
try {
createTopic(topic, 2, 2)
@ -343,7 +347,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testSendBeforeAndAfterPartitionExpansion(): Unit = {
val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L)
val producer = createProducer(maxBlockMs = 5 * 1000L)
// create topic
createTopic(topic, 1, 2)
@ -402,7 +406,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testFlush(): Unit = {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
createTopic(topic, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@ -431,7 +435,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// Test closing from caller thread.
for (_ <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue(responses.forall(!_.isDone()), "No request is complete.")
producer.close(Duration.ZERO)
@ -467,7 +471,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
for (i <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
// send message to partition 0
// Only send the records in the first callback since we close the producer in the callback and no records

View File

@ -68,7 +68,6 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
classOf[GroupedUserPrincipalBuilder].getName)
this.serverConfig.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
super.setUp(testInfo)
brokerList = TestUtils.bootstrapServers(servers, listenerName)
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)

View File

@ -102,7 +102,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
super.setUp(testInfo)
privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
}
private def createDelegationTokens(): (DelegationToken, DelegationToken) = {

View File

@ -123,7 +123,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
private def createConfig(): Properties = {
val adminClientConfig = new Properties()
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)

View File

@ -560,7 +560,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
protected def createScramAdminClient(scramMechanism: String, user: String, password: String): Admin = {
createAdminClient(brokerList, securityProtocol, trustStoreFile, clientSaslProperties,
createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties,
scramMechanism, user, password)
}

View File

@ -123,7 +123,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
isValidClusterId(MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classOf[MockProducerInterceptor].getName)
producerProps.put("mock.interceptor.append", appendStr)
producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockProducerMetricsReporter].getName)
@ -144,7 +144,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
assertNotNull(MockProducerMetricsReporter.CLUSTER_META)
isValidClusterId(MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, classOf[MockConsumerInterceptor].getName)
this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockConsumerMetricsReporter].getName)
val testConsumer = new KafkaConsumer(this.consumerConfig, new MockDeserializer, new MockDeserializer)

View File

@ -39,7 +39,7 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
@Test
def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers))
val consumer = TestUtils.createConsumer(bootstrapServers())
val offsetMap = Map(
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
).asJava

View File

@ -111,25 +111,21 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
super.setUp(testInfo)
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1")
producerConfig.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
producerConfig.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
if (createOffsetsTopic) {
if (isKRaftTest()) {
TestUtils.createOffsetsTopicWithAdmin(brokers, adminClientConfig)
} else {
TestUtils.createOffsetsTopic(zkClient, servers)
}
super.createOffsetsTopic(listenerName, adminClientConfig)
}
}
@ -162,13 +158,16 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
consumer
}
def createAdminClient(configOverrides: Properties = new Properties): Admin = {
def createAdminClient(
listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties
): Admin = {
val props = new Properties
props ++= adminClientConfig
props ++= configOverrides
val adminClient = Admin.create(props)
adminClients += adminClient
adminClient
val admin = TestUtils.createAdminClient(brokers, listenerName, props)
adminClients += admin
admin
}
@AfterEach

View File

@ -121,7 +121,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
saslProps.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
saslProps.put(SaslConfigs.SASL_JAAS_CONFIG, TestJaasConfig.jaasConfigProperty(kafkaClientSaslMechanism, "badUser", "badPass"))
// Use acks=0 to verify error metric when connection is closed without a response
val producer = TestUtils.createProducer(brokerList,
val producer = TestUtils.createProducer(bootstrapServers(),
acks = 0,
requestTimeoutMs = 1000,
maxBlockMs = 1000,

View File

@ -93,7 +93,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@Test
def testListNodes(): Unit = {
client = Admin.create(createConfig)
val brokerStrs = brokerList.split(",").toList.sorted
val brokerStrs = bootstrapServers().split(",").toList.sorted
var nodeStrs: List[String] = null
do {
val nodes = client.describeCluster().nodes().get().asScala
@ -209,7 +209,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val controller = result.controller().get()
assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
val brokers = brokerList.split(",")
val brokers = bootstrapServers().split(",")
assertEquals(brokers.size, nodes.size)
for (node <- nodes.asScala) {
val hostStr = s"${node.host}:${node.port}"
@ -308,7 +308,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
import scala.concurrent.ExecutionContext.Implicits._
val producerFuture = Future {
val producer = TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol),
bootstrapServers(),
securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
retries = 0, // Producer should not have to retry when broker is moving replica between log directories.
@ -674,7 +674,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
restartDeadBrokers()
client.close()
brokerList = TestUtils.bootstrapServers(servers, listenerName)
client = Admin.create(createConfig)
TestUtils.waitUntilTrue(() => {

View File

@ -1130,7 +1130,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val appendStr = "mock"
// create producer with interceptor that has different key and value types from the producer
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
producerProps.put("mock.interceptor.append", appendStr)
val testProducer = createProducer()

View File

@ -36,7 +36,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
@Test
def testWrongSerializer(): Unit = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = registerProducer(new KafkaProducer(producerProps))
@ -46,7 +46,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
@Test
def testBatchSizeZero(): Unit = {
val producer = createProducer(brokerList = brokerList,
val producer = createProducer(
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue,
batchSize = 0)
@ -55,7 +55,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
@Test
def testSendCompressedMessageWithLogAppendTime(): Unit = {
val producer = createProducer(brokerList = brokerList,
val producer = createProducer(
compressionType = "gzip",
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue)
@ -64,7 +64,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
@Test
def testSendNonCompressedMessageWithLogAppendTime(): Unit = {
val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
@ -75,7 +75,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
*/
@Test
def testAutoCreateTopic(): Unit = {
val producer = createProducer(brokerList)
val producer = createProducer()
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
@ -95,7 +95,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
createTopic(topic, 1, 2, topicProps)
val producer = createProducer(brokerList = brokerList)
val producer = createProducer()
try {
val e = assertThrows(classOf[ExecutionException],
() => producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()).getCause
@ -105,7 +105,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
// Test compressed messages.
val compressedProducer = createProducer(brokerList = brokerList, compressionType = "gzip")
val compressedProducer = createProducer(compressionType = "gzip")
try {
val e = assertThrows(classOf[ExecutionException],
() => compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()).getCause
@ -158,7 +158,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
// Topic metadata not available, send should fail without blocking
val producer = createProducer(brokerList = brokerList, maxBlockMs = 0)
val producer = createProducer(maxBlockMs = 0)
verifyMetadataNotAvailable(send(producer))
// Test that send starts succeeding once metadata is available
@ -166,7 +166,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
verifySendSuccess(future)
// Verify that send fails immediately without blocking when there is no space left in the buffer
val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0,
val producer2 = createProducer(maxBlockMs = 0,
lingerMs = 15000, batchSize = 1100, bufferSize = 1500)
val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued
verifyBufferExhausted(send(producer2)) // should fail send since buffer is full
@ -176,7 +176,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
@Test
def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))
val keyLengthSize = 1

View File

@ -62,7 +62,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
def testCompression(compression: String): Unit = {
val producerProps = new Properties()
val bootstrapServers = TestUtils.getBrokerListStrFromServers(Seq(server))
val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(server))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")

View File

@ -63,11 +63,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
producer1 = TestUtils.createProducer(brokerList, acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
producer1 = TestUtils.createProducer(bootstrapServers(), acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
producer2 = TestUtils.createProducer(brokerList, acks = 1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
producer2 = TestUtils.createProducer(bootstrapServers(), acks = 1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
producer3 = TestUtils.createProducer(brokerList, acks = -1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
producer3 = TestUtils.createProducer(bootstrapServers(), acks = -1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
}

View File

@ -51,7 +51,7 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
val topic = "topic"
// Create topic with leader as 0 for the 2 partitions.
createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
val reassignment = Map(
new TopicPartition(topic, 0) -> Seq(1, 0),

View File

@ -44,7 +44,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
@Test
def testAutoCreateTopic(): Unit = {
val producer = TestUtils.createProducer(brokerList)
val producer = TestUtils.createProducer(bootstrapServers())
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)

View File

@ -59,7 +59,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
}
override def createPrivilegedAdminClient() = {
createAdminClient(brokerList, securityProtocol, trustStoreFile, clientSaslProperties,
createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties,
kafkaClientSaslMechanism, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
}
@ -142,7 +142,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
@Test
def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
val props = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props)
def describeTopic(): Unit = {
@ -203,7 +203,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
}
finally propsStream.close()
val cgcArgs = Array("--bootstrap-server", brokerList,
val cgcArgs = Array("--bootstrap-server", bootstrapServers(),
"--describe",
"--group", "test.group",
"--command-config", propsFile.getAbsolutePath)

View File

@ -52,7 +52,7 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
super.setUp(testInfo)
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(brokers),
consumer = TestUtils.createConsumer(bootstrapServers(),
enableAutoCommit = false,
readCommitted = true)

View File

@ -752,7 +752,7 @@ class TransactionsTest extends KafkaServerTestHarness {
private def createReadCommittedConsumer(group: String = "group",
maxPollRecords: Int = 500,
props: Properties = new Properties) = {
val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
val consumer = TestUtils.createConsumer(bootstrapServers(),
groupId = group,
enableAutoCommit = false,
readCommitted = true,
@ -762,7 +762,7 @@ class TransactionsTest extends KafkaServerTestHarness {
}
private def createReadUncommittedConsumer(group: String) = {
val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
val consumer = TestUtils.createConsumer(bootstrapServers(),
groupId = group,
enableAutoCommit = false)
nonTransactionalConsumers += consumer

View File

@ -115,7 +115,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
}
private def createReadCommittedConsumer(group: String) = {
val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
val consumer = TestUtils.createConsumer(bootstrapServers(),
groupId = group,
enableAutoCommit = false,
readCommitted = true)

View File

@ -61,7 +61,7 @@ class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest {
ensureControllerWithIBP(version)
assertEquals(controllerBroker, controllerSocketServer.config.brokerId)
val partitionLeaders = createTopic(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))
val partitionLeaders = createTopicWithAssignment(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))
TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
assertEquals(1, partitionLeaders(0))
@ -97,7 +97,7 @@ class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest {
// Ensure controller version = version1
ensureControllerWithIBP(version1)
assertEquals(broker1, controllerSocketServer.config.brokerId)
val partitionLeaders = createTopic(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))
val partitionLeaders = createTopicWithAssignment(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))
TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
assertEquals(1, partitionLeaders(0))
assertEquals(0, partitionLeaders(1))
@ -116,7 +116,7 @@ class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest {
ensureControllerWithIBP(version2)
assertEquals(broker2, controllerSocketServer.config.brokerId)
// Create a new topic
createTopic(topic2, Map(0 -> Seq(1, 0, 2)))
createTopicWithAssignment(topic2, Map(0 -> Seq(1, 0, 2)))
TestUtils.waitForAllPartitionsMetadata(servers, topic2, 1)
TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)

View File

@ -50,7 +50,7 @@ class FetchRequestTestDowngrade extends BaseRequestTest {
ensureControllerIn(Seq(0))
assertEquals(0, controllerSocketServer.config.brokerId)
val partitionLeaders = createTopic(tp.topic, Map(tp.partition -> Seq(1, 0)))
val partitionLeaders = createTopicWithAssignment(tp.topic, Map(tp.partition -> Seq(1, 0)))
TestUtils.waitForAllPartitionsMetadata(servers, tp.topic, 1)
ensureControllerIn(Seq(1))
assertEquals(1, controllerSocketServer.config.brokerId)

View File

@ -47,7 +47,7 @@ class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
// Kill controller and restart until broker with latest ibp become controller
ensureControllerIn(Seq(1, 2))
createTopic(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)))
createTopicWithAssignment(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)))
val resp1 = sendMetadataRequest(new MetadataRequest(requestData(topic, Uuid.ZERO_UUID), 12.toShort), controllerSocketServer)
val topicId = resp1.topicMetadata.iterator().next().topicId()

View File

@ -62,7 +62,7 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1")
val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, includeOpt = Some("any"))
@ -75,7 +75,7 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000")
val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, includeOpt = Some("any"))
@ -89,10 +89,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
def testCommaSeparatedRegex(): Unit = {
val topic = "new-topic"
val msg = "a test message"
val brokerList = TestUtils.getBrokerListStrFromServers(servers)
val producerProps = new Properties
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
val producer = new MirrorMakerProducer(true, producerProps)
@ -103,7 +102,7 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, includeOpt = Some("another_topic,new.*,foo"))

View File

@ -43,7 +43,7 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
@ -201,7 +201,7 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
}
private def addBootstrapServer(args: Array[String]): Array[String] = {
args ++ Array("--bootstrap-server", brokerList)
args ++ Array("--bootstrap-server", bootstrapServers())
}
}

View File

@ -30,8 +30,6 @@ import kafka.zk.ReassignPartitionsZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.jfree.chart.plot.PlotOrientation
import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart}
@ -111,8 +109,7 @@ object ReplicationQuotasTestRig {
.map(c => createServer(KafkaConfig.fromProps(c)))
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
val brokerList = TestUtils.bootstrapServers(servers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val brokerList = TestUtils.plaintextBootstrapServers(servers)
adminClient = Admin.create(Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
).asJava)
@ -140,7 +137,7 @@ object ReplicationQuotasTestRig {
createTopic(zkClient, topicName, replicas, servers)
println("Writing Data")
val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = 0)
val producer = TestUtils.createProducer(TestUtils.plaintextBootstrapServers(servers), acks = 0)
(0 until config.msgsPerPartition).foreach { x =>
(0 until config.partitions).foreach { partition =>
producer.send(new ProducerRecord(topicName, partition, null, new Array[Byte](config.msgSize)))

View File

@ -52,10 +52,10 @@ class AddPartitionsTest extends BaseRequestTest {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
createTopic(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
createTopic(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
createTopic(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })
createTopic(topic4, partitionReplicaAssignment = topic4Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic4, partitionReplicaAssignment = topic4Assignment.map { case (k, v) => k -> v.replicas })
}
@Test

View File

@ -77,7 +77,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] = {
val props = new Properties
props.put("bootstrap.servers", brokerList)
props.put("bootstrap.servers", bootstrapServers())
props.put("group.id", group)
props.put("enable.auto.commit", "false")
new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
@ -96,14 +96,14 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
strategy: String = classOf[RangeAssignor].getName,
customPropsOpt: Option[Properties] = None,
syncCommit: Boolean = false): ConsumerGroupExecutor = {
val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy, customPropsOpt, syncCommit)
val executor = new ConsumerGroupExecutor(bootstrapServers(), numConsumers, group, topic, strategy, customPropsOpt, syncCommit)
addExecutor(executor)
executor
}
def addSimpleGroupExecutor(partitions: Iterable[TopicPartition] = Seq(new TopicPartition(topic, 0)),
group: String = group): SimpleConsumerGroupExecutor = {
val executor = new SimpleConsumerGroupExecutor(brokerList, group, partitions)
val executor = new SimpleConsumerGroupExecutor(bootstrapServers(), group, partitions)
addExecutor(executor)
executor
}

View File

@ -57,7 +57,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
private def createAdminConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) }
@ -110,27 +110,27 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
}
private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = {
val opts = ListBuffer("--bootstrap-server", brokerList, "--max-life-time-period", "-1",
val opts = ListBuffer("--bootstrap-server", bootstrapServers(), "--max-life-time-period", "-1",
"--command-config", "testfile", "--create")
renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer))
new DelegationTokenCommandOptions(opts.toArray)
}
private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = {
val opts = ListBuffer("--bootstrap-server", brokerList, "--command-config", "testfile", "--describe")
val opts = ListBuffer("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--describe")
owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
new DelegationTokenCommandOptions(opts.toArray)
}
private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--renew",
val opts = Array("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--renew",
"--renew-time-period", "-1",
"--hmac", hmac)
new DelegationTokenCommandOptions(opts)
}
private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--expire",
val opts = Array("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--expire",
"--expiry-time-period", "-1",
"--hmac", hmac)
new DelegationTokenCommandOptions(opts)

View File

@ -28,7 +28,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@Test
def testDeleteWithTopicOption(): Unit = {
TestUtils.createOffsetsTopic(zkClient, servers)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group, "--topic")
assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs))
}
@ -37,7 +37,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
TestUtils.createOffsetsTopic(zkClient, servers)
val missingGroup = "missing.group"
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", missingGroup)
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
@ -51,7 +51,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val missingGroup = "missing.group"
// note the group to be deleted is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", missingGroup)
val service = getConsumerGroupService(cgcArgs)
val result = service.deleteGroups()
@ -65,7 +65,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
// run one consumer in the group
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -83,7 +83,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
// run one consumer in the group
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -103,7 +103,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -133,7 +133,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
group -> executor
}).toMap
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--all-groups")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--all-groups")
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -164,7 +164,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -189,7 +189,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -215,7 +215,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -240,7 +240,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@Test
def testDeleteWithUnrecognizedNewConsumerOption(): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--delete", "--group", group)
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--delete", "--group", group)
assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs))
}
}

View File

@ -38,7 +38,7 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
def getArgs(group: String, topic: String): Array[String] = {
Array(
"--bootstrap-server", brokerList,
"--bootstrap-server", bootstrapServers(),
"--delete-offsets",
"--group", group,
"--topic", topic
@ -173,7 +173,7 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
}
private def createProducer(config: Properties = new Properties()): KafkaProducer[Array[Byte], Array[Byte]] = {
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1")
config.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
config.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
@ -182,7 +182,7 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
}
private def createConsumer(config: Properties = new Properties()): KafkaConsumer[Array[Byte], Array[Byte]] = {
config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
config.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
config.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, group)
config.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)

View File

@ -131,7 +131,7 @@ class DeleteTopicTest extends QuorumTestHarness {
adminZkClient.deleteTopic(topic)
// verify that a partition from the topic cannot be reassigned
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers))
val adminClient = Admin.create(props)
try {
waitUntilTopicGone(adminClient, "test")
@ -222,7 +222,7 @@ class DeleteTopicTest extends QuorumTestHarness {
// increase the partition count for topic
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers))
val adminClient = Admin.create(props)
try {
adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get()

View File

@ -42,7 +42,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
for (describeType <- describeTypes) {
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", missingGroup) ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", missingGroup) ++ describeType
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.describeGroups())
@ -60,7 +60,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
exitMessage = err
throw new RuntimeException
}
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--members", "--state")
try {
ConsumerGroupCommand.main(cgcArgs)
} catch {
@ -81,7 +81,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
exitMessage = err
throw new RuntimeException
}
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--all-groups", "--state", "Stable")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--all-groups", "--state", "Stable")
try {
ConsumerGroupCommand.main(cgcArgs)
} catch {
@ -101,7 +101,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val (state, assignments) = service.collectGroupOffsets(group)
@ -117,7 +117,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val (state, assignments) = service.collectGroupMembers(group, false)
@ -137,7 +137,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val state = service.collectGroupState(group)
@ -155,7 +155,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val group = this.group + describeType.mkString("")
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, group = group)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -179,7 +179,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val expectedNumLines = describeTypes.length * 2
for (describeType <- describeTypes) {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe") ++ groups ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe") ++ groups ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -203,7 +203,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val expectedNumLines = describeTypes.length * 2
for (describeType <- describeTypes) {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--all-groups") ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--all-groups") ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -221,7 +221,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -241,7 +241,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -274,7 +274,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -293,7 +293,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, strategy = classOf[RoundRobinAssignor].getName)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -314,7 +314,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val group = this.group + describeType.mkString("")
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1, group = group)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -337,7 +337,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -369,7 +369,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -393,7 +393,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -421,7 +421,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val group = this.group + describeType.mkString("")
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2, group = group)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -439,7 +439,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -458,7 +458,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -483,7 +483,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -502,7 +502,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val group = this.group + describeType.mkString("")
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(2, topic2, group = group)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -522,7 +522,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(numConsumers = 2, topic2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -544,7 +544,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(numConsumers = 2, topic2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -570,7 +570,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(numConsumers = 2, topic2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -588,7 +588,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
createTopic(topic2, 2, 1)
addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
@ -607,7 +607,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--timeout", "1", "--group", group) ++ describeType
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--timeout", "1", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
val e = assertThrows(classOf[ExecutionException], () => TestUtils.grabConsoleOutputAndError(service.describeGroups()))
@ -623,7 +623,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
addConsumerGroupExecutor(numConsumers = 1)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)
val e = assertThrows(classOf[ExecutionException], () => service.collectGroupOffsets(group))
@ -639,7 +639,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
addConsumerGroupExecutor(numConsumers = 1)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)
var e = assertThrows(classOf[ExecutionException], () => service.collectGroupMembers(group, false))
@ -657,7 +657,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
addConsumerGroupExecutor(numConsumers = 1)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)
val e = assertThrows(classOf[ExecutionException], () => service.collectGroupState(group))
@ -666,7 +666,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
@Test
def testDescribeWithUnrecognizedNewConsumerOption(): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
assertThrows(classOf[joptsimple.OptionException], () => getConsumerGroupService(cgcArgs))
}
@ -680,7 +680,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps))
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {

View File

@ -75,7 +75,7 @@ class FeatureCommandTest extends BaseRequestTest {
@Test
def testDescribeFeaturesSuccess(): Unit = {
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe")))
val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--describe")))
featureApis.setSupportedFeatures(defaultSupportedFeatures)
try {
val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
@ -99,7 +99,7 @@ class FeatureCommandTest extends BaseRequestTest {
*/
@Test
def testUpgradeAllFeaturesSuccess(): Unit = {
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1):
@ -146,8 +146,8 @@ class FeatureCommandTest extends BaseRequestTest {
*/
@Test
def testDowngradeFeaturesSuccess(): Unit = {
val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--downgrade-all"))
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--downgrade-all"))
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1):
@ -197,7 +197,7 @@ class FeatureCommandTest extends BaseRequestTest {
*/
@Test
def testUpgradeFeaturesFailure(): Unit = {
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1): Update the supported features across all brokers.

View File

@ -30,7 +30,6 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.network.ListenerName
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
@ -258,7 +257,7 @@ object LeaderElectionCommandTest {
}
def bootstrapServers(servers: Seq[KafkaServer]): String = {
TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
TestUtils.plaintextBootstrapServers(servers)
}
def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = {

View File

@ -32,7 +32,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
addSimpleGroupExecutor(group = simpleGroup)
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--list")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
val service = getConsumerGroupService(cgcArgs)
val expectedGroups = Set(group, simpleGroup)
@ -45,7 +45,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
@Test
def testListWithUnrecognizedNewConsumerOption(): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list")
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list")
assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs))
}
@ -55,7 +55,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
addSimpleGroupExecutor(group = simpleGroup)
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state")
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
@ -105,19 +105,19 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
addConsumerGroupExecutor(numConsumers = 1)
var out = ""
var cgcArgs = Array("--bootstrap-server", brokerList, "--list")
var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
}, s"Expected to find $simpleGroup, $group and no header, but found $out")
cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
}, s"Expected to find $simpleGroup, $group and the header, but found $out")
cgcArgs = Array("--bootstrap-server", brokerList, "--list", "--state", "Stable")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "Stable")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
out.contains("STATE") && out.contains(group) && out.contains("Stable")

View File

@ -39,7 +39,7 @@ class LogDirsCommandTest extends KafkaServerTestHarness {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
//input exist brokerList
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0", "--describe"), printStream)
val existingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val existingBrokersLineIter = existingBrokersContent.split("\n").iterator
@ -48,7 +48,7 @@ class LogDirsCommandTest extends KafkaServerTestHarness {
//input nonexistent brokerList
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,1,2", "--describe"), printStream)
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,1,2", "--describe"), printStream)
val nonExistingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val nonExistingBrokersLineIter = nonExistingBrokersContent.split("\n").iterator
@ -57,7 +57,7 @@ class LogDirsCommandTest extends KafkaServerTestHarness {
//input duplicate ids
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,0,1,2,2", "--describe"), printStream)
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--broker-list", "0,0,1,2,2", "--describe"), printStream)
val duplicateBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val duplicateBrokersLineIter = duplicateBrokersContent.split("\n").iterator
@ -66,7 +66,7 @@ class LogDirsCommandTest extends KafkaServerTestHarness {
//use all brokerList for current cluster
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--describe"), printStream)
LogDirsCommand.describe(Array("--bootstrap-server", bootstrapServers(), "--describe"), printStream)
val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val allBrokersLineIter = allBrokersContent.split("\n").iterator

View File

@ -54,7 +54,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
private def basicArgs: Array[String] = {
Array("--reset-offsets",
"--bootstrap-server", brokerList,
"--bootstrap-server", bootstrapServers(),
"--timeout", test.TestUtils.DEFAULT_MAX_WAIT_MS.toString)
}
@ -425,7 +425,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
@Test
def testResetWithUnrecognizedNewConsumerOption(): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--reset-offsets", "--group", group, "--all-topics",
"--to-offset", "2", "--export")
assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs))
}

View File

@ -85,7 +85,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
// create adminClient
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
adminClient = Admin.create(props)
topicService = TopicService(adminClient)
testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"

View File

@ -40,7 +40,7 @@ class UserScramCredentialsCommandTest extends BaseRequestTest {
exitStatus = Some(status)
throw new RuntimeException
}
val commandArgs = Array("--bootstrap-server", brokerList) ++ args
val commandArgs = Array("--bootstrap-server", bootstrapServers()) ++ args
try {
Console.withOut(printStream) {
ConfigCommand.main(commandArgs)

View File

@ -31,6 +31,7 @@ import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import java.util.Properties
import kafka.utils.TestUtils.{createAdminClient, resource}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.ScramCredential
@ -58,7 +59,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
_brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
}
var brokerList: String = null
var alive: Array[Boolean] = null
/**
@ -96,6 +96,10 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
def boundPort(server: KafkaServer): Int = server.boundPort(listenerName)
def bootstrapServers(listenerName: ListenerName = listenerName): String = {
TestUtils.bootstrapServers(_brokers, listenerName)
}
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol)
protected def trustStoreFile: Option[File] = None
@ -141,25 +145,51 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
createBrokers(startup)
}
def createOffsetsTopic(
listenerName: ListenerName = listenerName,
adminClientConfig: Properties = new Properties
): Unit = {
if (isKRaftTest()) {
resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
}
} else {
TestUtils.createOffsetsTopic(zkClient, servers)
}
}
/**
* Create a topic.
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
def createTopic(topic: String,
numPartitions: Int = 1,
replicationFactor: Int = 1,
topicConfig: Properties = new Properties,
adminClientConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
def createTopic(
topic: String,
numPartitions: Int = 1,
replicationFactor: Int = 1,
topicConfig: Properties = new Properties,
listenerName: ListenerName = listenerName
): scala.collection.immutable.Map[Int, Int] = {
if (isKRaftTest()) {
TestUtils.createTopicWithAdmin(topic = topic,
brokers = brokers,
resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
brokers = brokers,
numPartitions = numPartitions,
replicationFactor = replicationFactor,
topicConfig = topicConfig
)
}
} else {
TestUtils.createTopic(
zkClient = zkClient,
topic = topic,
numPartitions = numPartitions,
replicationFactor = replicationFactor,
topicConfig = topicConfig,
adminConfig = adminClientConfig)
} else {
TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
servers = servers,
topicConfig = topicConfig
)
}
}
@ -168,18 +198,40 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
def createTopicWithAssignment(
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
listenerName: ListenerName = listenerName
): scala.collection.immutable.Map[Int, Int] =
if (isKRaftTest()) {
TestUtils.createTopicWithAdmin(topic = topic,
replicaAssignment = partitionReplicaAssignment,
brokers = brokers)
resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
replicaAssignment = partitionReplicaAssignment,
brokers = brokers
)
}
} else {
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
TestUtils.createTopic(
zkClient,
topic,
partitionReplicaAssignment,
servers
)
}
def deleteTopic(topic: String): Unit = {
def deleteTopic(
topic: String,
listenerName: ListenerName = listenerName
): Unit = {
if (isKRaftTest()) {
TestUtils.deleteTopicWithAdmin(topic, brokers)
resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
brokers = brokers)
}
} else {
adminZkClient.deleteTopic(topic)
}
@ -219,7 +271,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
_brokers(i).startup()
alive(i) = true
}
brokerList = TestUtils.bootstrapServers(_brokers, listenerName)
}
def waitForUserScramCredentialToAppearOnAllBrokers(clientPrincipal: String, mechanismName: String): Unit = {
@ -286,7 +337,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
alive(_brokers.length - 1) = true
}
}
brokerList = if (startup) TestUtils.bootstrapServers(_brokers, listenerName) else null
}
private def createBrokerFromConfig(config: KafkaConfig): KafkaBroker = {

View File

@ -33,8 +33,6 @@ import kafka.utils.TestUtils._
import kafka.server.QuorumTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsResult, Config, ConfigEntry}
import org.junit.jupiter.api.Assertions._
@ -270,7 +268,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
}
private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] = {
val brokerList = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val brokerList = TestUtils.plaintextBootstrapServers(servers)
// Don't rely on coordinator as it may be down when this method is called
val consumer = TestUtils.createConsumer(brokerList,
groupId = "group" + random.nextLong(),
@ -351,7 +349,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
private def createAdminClient(): Admin = {
val config = new Properties
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
val bootstrapServers = TestUtils.plaintextBootstrapServers(servers)
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
Admin.create(config)

View File

@ -69,7 +69,7 @@ class BaseFetchRequestTest extends BaseRequestTest {
}
protected def initProducer(): Unit = {
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
producer = TestUtils.createProducer(bootstrapServers(),
keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
}

View File

@ -38,7 +38,7 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
def createAdminConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) }

View File

@ -56,7 +56,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
private def createAdminConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) }

View File

@ -45,7 +45,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
def createAdminConfig: util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) }

View File

@ -449,7 +449,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
private def createAdminClient(): Admin = {
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
Admin.create(props)
}
}

View File

@ -53,7 +53,7 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
}
private def initProducer(): Unit = {
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
producer = TestUtils.createProducer(bootstrapServers(),
keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
}

View File

@ -60,7 +60,7 @@ class FetchRequestMaxBytesTest extends BaseRequestTest {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers))
producer = TestUtils.createProducer(bootstrapServers())
}
@AfterEach

View File

@ -365,7 +365,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
val msgValueLen = 100 * 1000
val batchSize = 4 * msgValueLen
val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
val producer = TestUtils.createProducer(
bootstrapServers(),
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue,
batchSize = batchSize,
@ -426,7 +427,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
@Test
def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = {
// Increase linger so that we have control over the batches created
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
producer = TestUtils.createProducer(bootstrapServers(),
retries = 5,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer,
@ -518,7 +519,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
val foo1 = new TopicPartition("foo", 1)
// topicNames can be empty because we are using old requests
val topicNames = Map[Uuid, String]().asJava
createTopic("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
createTopicWithAssignment("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
val bar0 = new TopicPartition("bar", 0)
val req1 = createFetchRequest(List(foo0, foo1, bar0), JFetchMetadata.INITIAL, Nil)
val resp1 = sendFetchRequest(0, req1)
@ -542,7 +543,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertFalse(responseData2.containsKey(foo1))
assertTrue(responseData2.containsKey(bar0))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, responseData2.get(bar0).errorCode)
createTopic("bar", Map(0 -> List(0, 1)))
createTopicWithAssignment("bar", Map(0 -> List(0, 1)))
val req3 = createFetchRequest(Nil, new JFetchMetadata(resp1.sessionId(), 2), Nil)
val resp3 = sendFetchRequest(0, req3)
assertEquals(Errors.NONE, resp3.error())
@ -576,7 +577,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
createTopic("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
createTopicWithAssignment("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
val topicIds = getTopicIds()
val topicIdsWithUnknown = topicIds ++ Map("bar" -> Uuid.randomUuid())
val bar0 = new TopicPartition("bar", 0)
@ -614,7 +615,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
val topicNames = topicIds.asScala.map(_.swap).asJava
// Produce messages (v2)
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
producer = TestUtils.createProducer(bootstrapServers(),
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
@ -661,7 +662,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
val topicNames = topicIds.asScala.map(_.swap).asJava
// Produce GZIP compressed messages (v2)
val producer1 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
val producer1 = TestUtils.createProducer(bootstrapServers(),
compressionType = GZIPCompressionCodec.name,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
@ -669,7 +670,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
"key1", "value1")).get
producer1.close()
// Produce ZSTD compressed messages (v2)
val producer2 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
val producer2 = TestUtils.createProducer(bootstrapServers(),
compressionType = ZStdCompressionCodec.name,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)

View File

@ -67,8 +67,8 @@ class LogRecoveryTest extends QuorumTestHarness {
def updateProducer() = {
if (producer != null)
producer.close()
producer = TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(servers),
producer = createProducer(
plaintextBootstrapServers(servers),
keySerializer = new IntegerSerializer,
valueSerializer = new StringSerializer
)

View File

@ -236,8 +236,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
val topic1 = "topic1"
val topic2 = "topic2"
createTopic(topic1, replicaAssignment)
createTopic(topic2, replicaAssignment)
createTopicWithAssignment(topic1, replicaAssignment)
createTopicWithAssignment(topic2, replicaAssignment)
// if version < 9, return ZERO_UUID in MetadataResponse
val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 0, 9).build(), Some(anySocketServer))
@ -264,7 +264,7 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
@ValueSource(strings = Array("zk", "kraft"))
def testPreferredReplica(quorum: String): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
createTopic("t1", replicaAssignment)
createTopicWithAssignment("t1", replicaAssignment)
// Test metadata on two different brokers to ensure that metadata propagation works correctly
val responses = Seq(0, 1).map(index =>
sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, true).build(),

View File

@ -57,7 +57,7 @@ class ReplicaFetchTest extends QuorumTestHarness {
}
// send test messages to leader
val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
val producer = TestUtils.createProducer(TestUtils.plaintextBootstrapServers(brokers),
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++

View File

@ -114,7 +114,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
adminZkClient.changeTopicConfig(topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
//Add data equally to each partition
producer = createProducer(getBrokerListStrFromServers(brokers), acks = 1)
producer = createProducer(plaintextBootstrapServers(brokers), acks = 1)
(0 until msgCount).foreach { _ =>
(0 to 7).foreach { partition =>
producer.send(new ProducerRecord(topic, partition, null, msg))
@ -210,7 +210,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
}
def addData(msgCount: Int, msg: Array[Byte]): Unit = {
producer = createProducer(getBrokerListStrFromServers(brokers), acks = 0)
producer = createProducer(plaintextBootstrapServers(brokers), acks = 0)
(0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
waitForOffsetsToMatch(msgCount, 0, 100)
}

View File

@ -86,22 +86,22 @@ class ServerShutdownTest extends KafkaServerTestHarness {
@ValueSource(strings = Array("zk", "kraft"))
def testCleanShutdown(quorum: String): Unit = {
def createProducer(broker: KafkaBroker): KafkaProducer[Integer, String] =
def createProducer(): KafkaProducer[Integer, String] =
TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(Seq(broker)),
bootstrapServers(),
keySerializer = new IntegerSerializer,
valueSerializer = new StringSerializer
)
def createConsumer(broker: KafkaBroker): KafkaConsumer[Integer, String] =
def createConsumer(): KafkaConsumer[Integer, String] =
TestUtils.createConsumer(
TestUtils.getBrokerListStrFromServers(Seq(broker)),
bootstrapServers(),
securityProtocol = SecurityProtocol.PLAINTEXT,
keyDeserializer = new IntegerDeserializer,
valueDeserializer = new StringDeserializer
)
var producer = createProducer(broker)
var producer = createProducer()
// create topic
createTopic(topic)
@ -124,8 +124,8 @@ class ServerShutdownTest extends KafkaServerTestHarness {
// wait for the broker to receive the update metadata request after startup
TestUtils.waitForPartitionMetadata(Seq(broker), topic, 0)
producer = createProducer(broker)
val consumer = createConsumer(broker)
producer = createProducer()
val consumer = createConsumer()
consumer.subscribe(Seq(topic).asJava)
val consumerRecords = TestUtils.consumeRecords(consumer, sent1.size)

View File

@ -51,7 +51,7 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest {
def testMetadataTopicIdsWithOldIBP(): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
val topic1 = "topic1"
createTopic(topic1, replicaAssignment)
createTopicWithAssignment(topic1, replicaAssignment)
val resp = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic1).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
assertEquals(1, resp.topicMetadata.size)
@ -73,7 +73,7 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest {
val topicNames = topicIds.map(_.swap)
val tidp0 = new TopicIdPartition(topicIds(topic1), tp0)
val leadersMap = createTopic(topic1, replicaAssignment)
val leadersMap = createTopicWithAssignment(topic1, replicaAssignment)
val req = createFetchRequest(maxResponseBytes, maxPartitionBytes, Seq(tidp0), Map.empty, ApiKeys.FETCH.latestVersion())
val resp = sendFetchRequest(leadersMap(0), req)
@ -94,7 +94,7 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest {
val topicNames = topicIds.map(_.swap)
val tidp0 = new TopicIdPartition(topicIds(topic1), tp0)
val leadersMap = createTopic(topic1, replicaAssignment)
val leadersMap = createTopicWithAssignment(topic1, replicaAssignment)
val req = createFetchRequest(maxResponseBytes, maxPartitionBytes, Seq(tidp0), Map.empty, 12)
val resp = sendFetchRequest(leadersMap(0), req)

View File

@ -304,7 +304,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers,
CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1")))
producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
producer = TestUtils.createProducer(plaintextBootstrapServers(brokers), acks = 1)
// Write one message while both brokers are up
(0 until 1).foreach { i =>
@ -327,7 +327,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
//Bounce the producer (this is required, probably because the broker port changes on restart?)
producer.close()
producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
producer = TestUtils.createProducer(plaintextBootstrapServers(brokers), acks = 1)
//Write 3 messages
(0 until 3).foreach { i =>
@ -339,7 +339,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
//Bounce the producer (this is required, probably because the broker port changes on restart?)
producer.close()
producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
producer = TestUtils.createProducer(plaintextBootstrapServers(brokers), acks = 1)
//Write 1 message
(0 until 1).foreach { i =>
@ -351,7 +351,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
//Bounce the producer (this is required, probably because the broker port changes on restart?)
producer.close()
producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = 1)
producer = TestUtils.createProducer(plaintextBootstrapServers(brokers), acks = 1)
//Write 2 messages
(0 until 2).foreach { i =>
@ -393,7 +393,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
private def startConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
val consumerConfig = new Properties()
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers))
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
consumerConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(getLogFile(brokers(1), 0).length() * 2))
consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(getLogFile(brokers(1), 0).length() * 2))
consumer = new KafkaConsumer(consumerConfig, new ByteArrayDeserializer, new ByteArrayDeserializer)
@ -410,7 +410,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
}
private def createBufferingProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
TestUtils.createProducer(getBrokerListStrFromServers(brokers),
TestUtils.createProducer(plaintextBootstrapServers(brokers),
acks = -1,
lingerMs = 10000,
batchSize = msg.length * 1000,
@ -448,7 +448,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
}
private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1)
TestUtils.createProducer(plaintextBootstrapServers(brokers), acks = -1)
}
private def leader: KafkaServer = {

View File

@ -108,7 +108,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
TestUtils.createTopic(zkClient, topic2, assignment2, brokers)
//Send messages equally to the two partitions, then half as many to a third
producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
producer = createProducer(plaintextBootstrapServers(brokers), acks = -1)
(0 until 10).foreach { _ =>
producer.send(new ProducerRecord(topic1, 0, null, "IHeartLogs".getBytes))
}
@ -151,7 +151,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset
TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
producer = createProducer(plaintextBootstrapServers(brokers), acks = -1)
//1. Given a single message
producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
@ -264,7 +264,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
private def sendFourMessagesToEachTopic() = {
val testMessageList1 = List("test1", "test2", "test3", "test4")
val testMessageList2 = List("test5", "test6", "test7", "test8")
val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
val producer = TestUtils.createProducer(plaintextBootstrapServers(brokers),
keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
val records =
testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++

View File

@ -220,17 +220,16 @@ object TestUtils extends Logging {
}
}
def getBrokerListStrFromServers[B <: KafkaBroker](
brokers: Seq[B],
protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
brokers.map { s =>
val listener = s.config.effectiveAdvertisedListeners.find(_.securityProtocol == protocol).getOrElse(
sys.error(s"Could not find listener with security protocol $protocol"))
formatAddress(listener.host, boundPort(s, protocol))
}.mkString(",")
def plaintextBootstrapServers[B <: KafkaBroker](
brokers: Seq[B]
): String = {
bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
}
def bootstrapServers[B <: KafkaBroker](brokers: Seq[B], listenerName: ListenerName): String = {
def bootstrapServers[B <: KafkaBroker](
brokers: Seq[B],
listenerName: ListenerName
): String = {
brokers.map { s =>
val listener = s.config.effectiveAdvertisedListeners.find(_.listenerName == listenerName).getOrElse(
sys.error(s"Could not find listener with name ${listenerName.value}"))
@ -369,60 +368,60 @@ object TestUtils extends Logging {
config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
}
def createAdminClient[B <: KafkaBroker](
brokers: Seq[B],
adminConfig: Properties): Admin = {
val adminClientProperties = new Properties(adminConfig)
def createAdminClient[B <: KafkaBroker](
brokers: Seq[B],
listenerName: ListenerName,
adminConfig: Properties = new Properties
): Admin = {
val adminClientProperties = new Properties()
adminClientProperties.putAll(adminConfig)
if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers(brokers, brokers.head.config.interBrokerListenerName))
adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(brokers, listenerName))
}
Admin.create(adminClientProperties)
}
def createTopicWithAdmin[B <: KafkaBroker](
topic: String,
brokers: Seq[B],
numPartitions: Int = 1,
replicationFactor: Int = 1,
replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty,
topicConfig: Properties = new Properties,
adminConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
admin: Admin,
topic: String,
brokers: Seq[B],
numPartitions: Int = 1,
replicationFactor: Int = 1,
replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty,
topicConfig: Properties = new Properties,
): scala.collection.immutable.Map[Int, Int] = {
val effectiveNumPartitions = if (replicaAssignment.isEmpty) {
numPartitions
} else {
replicaAssignment.size
}
val adminClient = createAdminClient(brokers, adminConfig)
val configsMap = new util.HashMap[String, String]()
topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
try {
val configsMap = new util.HashMap[String, String]()
topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
try {
val result = if (replicaAssignment.isEmpty) {
adminClient.createTopics(Collections.singletonList(new NewTopic(
topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
} else {
val assignment = new util.HashMap[Integer, util.List[Integer]]()
replicaAssignment.forKeyValue { case (k, v) =>
val replicas = new util.ArrayList[Integer]
v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
assignment.put(k.asInstanceOf[Integer], replicas)
}
adminClient.createTopics(Collections.singletonList(new NewTopic(
topic, assignment).configs(configsMap)))
}
result.all().get()
} catch {
case e: ExecutionException => if (!(e.getCause != null &&
e.getCause.isInstanceOf[TopicExistsException] &&
topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic,
effectiveNumPartitions, replicationFactor))) {
throw e
val result = if (replicaAssignment.isEmpty) {
admin.createTopics(Collections.singletonList(new NewTopic(
topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
} else {
val assignment = new util.HashMap[Integer, util.List[Integer]]()
replicaAssignment.forKeyValue { case (k, v) =>
val replicas = new util.ArrayList[Integer]
v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
assignment.put(k.asInstanceOf[Integer], replicas)
}
admin.createTopics(Collections.singletonList(new NewTopic(
topic, assignment).configs(configsMap)))
}
result.all().get()
} catch {
case e: ExecutionException => if (!(e.getCause != null &&
e.getCause.isInstanceOf[TopicExistsException] &&
topicHasSameNumPartitionsAndReplicationFactor(admin, topic,
effectiveNumPartitions, replicationFactor))) {
throw e
}
} finally {
adminClient.close()
}
// wait until we've propagated all partitions metadata to all brokers
val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic, effectiveNumPartitions)
@ -445,33 +444,31 @@ object TestUtils extends Logging {
}
def createOffsetsTopicWithAdmin[B <: KafkaBroker](
brokers: Seq[B],
adminConfig: Properties = new Properties) = {
admin: Admin,
brokers: Seq[B]
): Map[Int, Int] = {
val broker = brokers.head
createTopicWithAdmin(topic = Topic.GROUP_METADATA_TOPIC_NAME,
createTopicWithAdmin(
admin = admin,
topic = Topic.GROUP_METADATA_TOPIC_NAME,
numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
brokers = brokers,
topicConfig = broker.groupCoordinator.offsetsTopicConfigs,
adminConfig = adminConfig)
)
}
def deleteTopicWithAdmin[B <: KafkaBroker](
topic: String,
brokers: Seq[B],
adminConfig: Properties = new Properties): Unit = {
val adminClient = createAdminClient(brokers, adminConfig)
admin: Admin,
topic: String,
brokers: Seq[B],
): Unit = {
try {
adminClient.deleteTopics(Collections.singletonList(topic)).all().get()
admin.deleteTopics(Collections.singletonList(topic)).all().get()
} catch {
case e: ExecutionException => if (e.getCause != null &&
e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
case e: ExecutionException if e.getCause != null &&
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
// ignore
} else {
throw e
}
} finally {
adminClient.close()
}
waitForAllPartitionsMetadata(brokers, topic, 0)
}
@ -1326,7 +1323,7 @@ object TestUtils extends Logging {
brokers: Seq[B],
records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
acks: Int = -1): Unit = {
val producer = createProducer(TestUtils.getBrokerListStrFromServers(brokers), acks = acks)
val producer = createProducer(plaintextBootstrapServers(brokers), acks = acks)
try {
val futures = records.map(producer.send)
futures.foreach(_.get)
@ -1359,7 +1356,7 @@ object TestUtils extends Logging {
timestamp: java.lang.Long = null,
deliveryTimeoutMs: Int = 30 * 1000,
requestTimeoutMs: Int = 20 * 1000): Unit = {
val producer = createProducer(TestUtils.getBrokerListStrFromServers(brokers),
val producer = createProducer(plaintextBootstrapServers(brokers),
deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs)
try {
producer.send(new ProducerRecord(topic, null, timestamp, topic.getBytes, message.getBytes)).get
@ -1612,7 +1609,7 @@ object TestUtils extends Logging {
securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
trustStoreFile: Option[File] = None,
waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val consumer = createConsumer(TestUtils.getBrokerListStrFromServers(brokers, securityProtocol),
val consumer = createConsumer(bootstrapServers(brokers, ListenerName.forSecurityProtocol(securityProtocol)),
groupId = groupId,
securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile)
@ -1672,7 +1669,7 @@ object TestUtils extends Logging {
requestTimeoutMs: Int = 30000,
maxInFlight: Int = 5): KafkaProducer[Array[Byte], Array[Byte]] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers))
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
@ -1692,7 +1689,7 @@ object TestUtils extends Logging {
brokers: Seq[B]): Unit = {
val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers))
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
for (i <- 0 until numRecords) {
@ -1760,24 +1757,6 @@ object TestUtils extends Logging {
}
}
def fetchEntityConfigWithAdmin[B <: KafkaBroker](
configResource: ConfigResource,
brokers: Seq[B],
adminConfig: Properties = new Properties): Properties = {
val properties = new Properties()
val adminClient = createAdminClient(brokers, adminConfig)
try {
val result = adminClient.describeConfigs(Collections.singletonList(configResource)).all().get()
val config = result.get(configResource)
if (config != null) {
config.entries().forEach(e => properties.setProperty(e.name(), e.value()))
}
} finally {
adminClient.close()
}
properties
}
def incrementalAlterConfigs[B <: KafkaBroker](
servers: Seq[B],
adminClient: Admin,
@ -1821,40 +1800,6 @@ object TestUtils extends Logging {
}, s"Timed out waiting for brokerId $brokerId to come online")
}
/**
* Get the replica assignment for some topics. Topics which don't exist will be ignored.
*/
def getReplicaAssignmentForTopics[B <: KafkaBroker](
topicNames: Seq[String],
brokers: Seq[B],
adminConfig: Properties = new Properties): Map[TopicPartition, Seq[Int]] = {
val adminClient = createAdminClient(brokers, adminConfig)
val results = new mutable.HashMap[TopicPartition, Seq[Int]]
try {
adminClient.describeTopics(topicNames.toList.asJava).topicNameValues().forEach {
case (topicName, future) =>
try {
val description = future.get()
description.partitions().forEach {
case partition =>
val topicPartition = new TopicPartition(topicName, partition.partition())
results.put(topicPartition, partition.replicas().asScala.map(_.id))
}
} catch {
case e: ExecutionException => if (e.getCause != null &&
e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
// ignore
} else {
throw e
}
}
}
} finally {
adminClient.close()
}
results
}
def waitForLeaderToBecome(
client: Admin,
topicPartition: TopicPartition,

View File

@ -91,7 +91,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
// Initialize TopicBasedRemoteLogMetadataManager.
Map<String, Object> configs = new HashMap<>();
configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList());
configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName()));
configs.put(BROKER_ID, 0);
configs.put(LOG_DIR, logDir);
configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT);

View File

@ -98,7 +98,9 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
leaderTopicReplicas.add(1);
leaderTopicReplicas.add(2);
assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopic(leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
String followerTopic = "new-follower";
HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
@ -108,7 +110,9 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
followerTopicReplicas.add(2);
followerTopicReplicas.add(0);
assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
remoteLogMetadataManagerHarness.createTopic(followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(followerTopic,
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));

View File

@ -83,7 +83,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
leaderTopicReplicas.add(1);
leaderTopicReplicas.add(2);
assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopic(leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
String followerTopic = "new-follower";
HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
@ -93,7 +95,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
followerTopicReplicas.add(2);
followerTopicReplicas.add(0);
assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
remoteLogMetadataManagerHarness.createTopic(followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));