mirror of https://github.com/apache/kafka.git
MINOR: Fix some typos for core (#13882)
Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
parent
d751c13950
commit
3d97743c67
|
@ -462,7 +462,7 @@ object TopicCommand extends Logging {
|
||||||
|
|
||||||
private def getReplicationFactor(tpi: TopicPartitionInfo, reassignment: Option[PartitionReassignment]): Int = {
|
private def getReplicationFactor(tpi: TopicPartitionInfo, reassignment: Option[PartitionReassignment]): Int = {
|
||||||
// It is possible for a reassignment to complete between the time we have fetched its state and the time
|
// It is possible for a reassignment to complete between the time we have fetched its state and the time
|
||||||
// we fetch partition metadata. In ths case, we ignore the reassignment when determining replication factor.
|
// we fetch partition metadata. In this case, we ignore the reassignment when determining replication factor.
|
||||||
def isReassignmentInProgress(ra: PartitionReassignment): Boolean = {
|
def isReassignmentInProgress(ra: PartitionReassignment): Boolean = {
|
||||||
// Reassignment is still in progress as long as the removing and adding replicas are still present
|
// Reassignment is still in progress as long as the removing and adding replicas are still present
|
||||||
val allReplicaIds = tpi.replicas.asScala.map(_.id).toSet
|
val allReplicaIds = tpi.replicas.asScala.map(_.id).toSet
|
||||||
|
|
|
@ -361,7 +361,7 @@ class ControllerBrokerRequestBatch(
|
||||||
* @param metadataProvider Provider to provide the relevant metadata to build the state needed to
|
* @param metadataProvider Provider to provide the relevant metadata to build the state needed to
|
||||||
* send RPCs
|
* send RPCs
|
||||||
* @param metadataVersionProvider Provider to provide the metadata version used by the controller.
|
* @param metadataVersionProvider Provider to provide the metadata version used by the controller.
|
||||||
* @param stateChangeLogger logger to log the various events while sending requests and receving
|
* @param stateChangeLogger logger to log the various events while sending requests and receiving
|
||||||
* responses from the brokers
|
* responses from the brokers
|
||||||
* @param kraftController whether the controller is KRaft controller
|
* @param kraftController whether the controller is KRaft controller
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -152,7 +152,7 @@ class KafkaRaftManager[T](
|
||||||
private val dataDir = createDataDir()
|
private val dataDir = createDataDir()
|
||||||
|
|
||||||
private val dataDirLock = {
|
private val dataDirLock = {
|
||||||
// Aquire the log dir lock if the metadata log dir is different from the log dirs
|
// Acquire the log dir lock if the metadata log dir is different from the log dirs
|
||||||
val differentMetadataLogDir = !config
|
val differentMetadataLogDir = !config
|
||||||
.logDirs
|
.logDirs
|
||||||
.map(Paths.get(_).toAbsolutePath)
|
.map(Paths.get(_).toAbsolutePath)
|
||||||
|
|
|
@ -63,7 +63,7 @@ trait ForwardingManager {
|
||||||
* byte buffer from the envelope request, since we will be mutating
|
* byte buffer from the envelope request, since we will be mutating
|
||||||
* the position and limit fields. It should be a copy.
|
* the position and limit fields. It should be a copy.
|
||||||
* @param requestBody The AbstractRequest we are sending.
|
* @param requestBody The AbstractRequest we are sending.
|
||||||
* @param requestToString A callback which can be invoked to produce a human-readable decription
|
* @param requestToString A callback which can be invoked to produce a human-readable description
|
||||||
* of the request.
|
* of the request.
|
||||||
* @param responseCallback A callback which takes in an `Option[AbstractResponse]`.
|
* @param responseCallback A callback which takes in an `Option[AbstractResponse]`.
|
||||||
* We will call this function with Some(x) after the controller responds with x.
|
* We will call this function with Some(x) after the controller responds with x.
|
||||||
|
|
|
@ -1134,7 +1134,7 @@ object KafkaConfig {
|
||||||
val DelegationTokenSecretKeyDoc = "Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers. " +
|
val DelegationTokenSecretKeyDoc = "Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers. " +
|
||||||
" If the key is not set or set to empty string, brokers will disable the delegation token support."
|
" If the key is not set or set to empty string, brokers will disable the delegation token support."
|
||||||
val DelegationTokenMaxLifeTimeDoc = "The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days."
|
val DelegationTokenMaxLifeTimeDoc = "The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days."
|
||||||
val DelegationTokenExpiryTimeMsDoc = "The token validity time in miliseconds before the token needs to be renewed. Default value 1 day."
|
val DelegationTokenExpiryTimeMsDoc = "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day."
|
||||||
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
|
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
|
||||||
|
|
||||||
/** ********* Password encryption configuration for dynamic configs *********/
|
/** ********* Password encryption configuration for dynamic configs *********/
|
||||||
|
|
|
@ -275,7 +275,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets topic partition states for the given partitions.
|
* Gets topic partition states for the given partitions.
|
||||||
* @param partitions the partitions for which we want ot get states.
|
* @param partitions the partitions for which we want to get states.
|
||||||
* @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
|
* @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
|
||||||
*/
|
*/
|
||||||
def getTopicPartitionStatesRaw(partitions: Seq[TopicPartition]): Seq[GetDataResponse] = {
|
def getTopicPartitionStatesRaw(partitions: Seq[TopicPartition]): Seq[GetDataResponse] = {
|
||||||
|
@ -599,7 +599,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
||||||
/**
|
/**
|
||||||
* Adds a topic ID to existing topic and replica assignments
|
* Adds a topic ID to existing topic and replica assignments
|
||||||
* @param topicIdReplicaAssignments the TopicIDReplicaAssignments to add a topic ID to
|
* @param topicIdReplicaAssignments the TopicIDReplicaAssignments to add a topic ID to
|
||||||
* @return the updated TopicIdReplicaAssigments including the newly created topic IDs
|
* @return the updated TopicIdReplicaAssignments including the newly created topic IDs
|
||||||
*/
|
*/
|
||||||
def setTopicIds(topicIdReplicaAssignments: collection.Set[TopicIdReplicaAssignment],
|
def setTopicIds(topicIdReplicaAssignments: collection.Set[TopicIdReplicaAssignment],
|
||||||
expectedControllerEpochZkVersion: Int): Set[TopicIdReplicaAssignment] = {
|
expectedControllerEpochZkVersion: Int): Set[TopicIdReplicaAssignment] = {
|
||||||
|
|
|
@ -266,7 +266,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
|
||||||
consumer.subscribe(List(topic).asJava)
|
consumer.subscribe(List(topic).asJava)
|
||||||
awaitAssignment(consumer, Set(tp, tp2))
|
awaitAssignment(consumer, Set(tp, tp2))
|
||||||
|
|
||||||
// should auto-commit seeked positions before closing
|
// should auto-commit sought positions before closing
|
||||||
consumer.seek(tp, 300)
|
consumer.seek(tp, 300)
|
||||||
consumer.seek(tp2, 500)
|
consumer.seek(tp2, 500)
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
@ -289,7 +289,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
|
||||||
consumer.subscribe(List(topic).asJava)
|
consumer.subscribe(List(topic).asJava)
|
||||||
awaitAssignment(consumer, Set(tp, tp2))
|
awaitAssignment(consumer, Set(tp, tp2))
|
||||||
|
|
||||||
// should auto-commit seeked positions before closing
|
// should auto-commit sought positions before closing
|
||||||
consumer.seek(tp, 300)
|
consumer.seek(tp, 300)
|
||||||
consumer.seek(tp2, 500)
|
consumer.seek(tp2, 500)
|
||||||
|
|
||||||
|
|
|
@ -376,7 +376,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
|
||||||
|
|
||||||
// Produce/consume should work with new truststore with new producer/consumer
|
// Produce/consume should work with new truststore with new producer/consumer
|
||||||
val producer = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build()
|
val producer = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build()
|
||||||
// Start the new consumer in a separate group than the continous consumer started at the beginning of the test so
|
// Start the new consumer in a separate group than the continuous consumer started at the beginning of the test so
|
||||||
// that it is not disrupted by rebalance.
|
// that it is not disrupted by rebalance.
|
||||||
val consumer = ConsumerBuilder("group2").trustStoreProps(sslProperties2).topic(topic2).build()
|
val consumer = ConsumerBuilder("group2").trustStoreProps(sslProperties2).topic(topic2).build()
|
||||||
verifyProduceConsume(producer, consumer, 10, topic2)
|
verifyProduceConsume(producer, consumer, 10, topic2)
|
||||||
|
|
|
@ -71,7 +71,7 @@ class RaftClusterSnapshotTest {
|
||||||
|
|
||||||
assertEquals(numberOfControllers + numberOfBrokers, cluster.raftManagers.size())
|
assertEquals(numberOfControllers + numberOfBrokers, cluster.raftManagers.size())
|
||||||
|
|
||||||
// For every controller and broker perform some sanity checks against the lastest snapshot
|
// For every controller and broker perform some sanity checks against the latest snapshot
|
||||||
for ((_, raftManager) <- cluster.raftManagers().asScala) {
|
for ((_, raftManager) <- cluster.raftManagers().asScala) {
|
||||||
TestUtils.resource(
|
TestUtils.resource(
|
||||||
RecordsSnapshotReader.of(
|
RecordsSnapshotReader.of(
|
||||||
|
|
|
@ -145,16 +145,16 @@ object TestLinearWriteSpeed {
|
||||||
while(totalWritten + bufferSize < bytesToWrite) {
|
while(totalWritten + bufferSize < bytesToWrite) {
|
||||||
val start = System.nanoTime
|
val start = System.nanoTime
|
||||||
val writeSize = writables((count % numFiles).toInt.abs).write()
|
val writeSize = writables((count % numFiles).toInt.abs).write()
|
||||||
val ellapsed = System.nanoTime - start
|
val elapsed = System.nanoTime - start
|
||||||
maxLatency = max(ellapsed, maxLatency)
|
maxLatency = max(elapsed, maxLatency)
|
||||||
totalLatency += ellapsed
|
totalLatency += elapsed
|
||||||
written += writeSize
|
written += writeSize
|
||||||
count += 1
|
count += 1
|
||||||
totalWritten += writeSize
|
totalWritten += writeSize
|
||||||
if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) {
|
if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) {
|
||||||
val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0)
|
val elapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0)
|
||||||
val mb = written / (1024.0*1024.0)
|
val mb = written / (1024.0*1024.0)
|
||||||
println("%10.3f\t%10.3f\t%10.3f".format(mb / ellapsedSecs, totalLatency / count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0)))
|
println("%10.3f\t%10.3f\t%10.3f".format(mb / elapsedSecs, totalLatency / count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0)))
|
||||||
lastReport = start
|
lastReport = start
|
||||||
written = 0
|
written = 0
|
||||||
maxLatency = 0L
|
maxLatency = 0L
|
||||||
|
|
|
@ -76,8 +76,8 @@ class ConsumerGroupServiceTest {
|
||||||
val testTopicPartition4 = new TopicPartition("testTopic2", 1);
|
val testTopicPartition4 = new TopicPartition("testTopic2", 1);
|
||||||
val testTopicPartition5 = new TopicPartition("testTopic2", 2);
|
val testTopicPartition5 = new TopicPartition("testTopic2", 2);
|
||||||
|
|
||||||
// Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
|
// Some topic's partitions gets valid OffsetAndMetadata values, other gets nulls values (negative integers) and others aren't defined
|
||||||
val commitedOffsets = Map(
|
val committedOffsets = Map(
|
||||||
testTopicPartition1 -> new OffsetAndMetadata(100),
|
testTopicPartition1 -> new OffsetAndMetadata(100),
|
||||||
testTopicPartition2 -> null,
|
testTopicPartition2 -> null,
|
||||||
testTopicPartition3 -> new OffsetAndMetadata(100),
|
testTopicPartition3 -> new OffsetAndMetadata(100),
|
||||||
|
@ -115,7 +115,7 @@ class ConsumerGroupServiceTest {
|
||||||
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()))
|
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec), any()))
|
||||||
.thenReturn(
|
.thenReturn(
|
||||||
AdminClientTestUtils.listConsumerGroupOffsetsResult(
|
AdminClientTestUtils.listConsumerGroupOffsetsResult(
|
||||||
Collections.singletonMap(group, commitedOffsets)))
|
Collections.singletonMap(group, committedOffsets)))
|
||||||
when(admin.listOffsets(
|
when(admin.listOffsets(
|
||||||
ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
|
ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
|
||||||
any()
|
any()
|
||||||
|
|
|
@ -224,9 +224,9 @@ class ReassignPartitionsUnitTest {
|
||||||
@Test
|
@Test
|
||||||
def testGetBrokerRackInformation(): Unit = {
|
def testGetBrokerRackInformation(): Unit = {
|
||||||
val adminClient = new MockAdminClient.Builder().
|
val adminClient = new MockAdminClient.Builder().
|
||||||
brokers(Arrays.asList(new Node(0, "locahost", 9092, "rack0"),
|
brokers(Arrays.asList(new Node(0, "localhost", 9092, "rack0"),
|
||||||
new Node(1, "locahost", 9093, "rack1"),
|
new Node(1, "localhost", 9093, "rack1"),
|
||||||
new Node(2, "locahost", 9094, null))).
|
new Node(2, "localhost", 9094, null))).
|
||||||
build()
|
build()
|
||||||
try {
|
try {
|
||||||
assertEquals(Seq(
|
assertEquals(Seq(
|
||||||
|
@ -304,12 +304,12 @@ class ReassignPartitionsUnitTest {
|
||||||
def testGenerateAssignmentWithInconsistentRacks(): Unit = {
|
def testGenerateAssignmentWithInconsistentRacks(): Unit = {
|
||||||
val adminClient = new MockAdminClient.Builder().
|
val adminClient = new MockAdminClient.Builder().
|
||||||
brokers(Arrays.asList(
|
brokers(Arrays.asList(
|
||||||
new Node(0, "locahost", 9092, "rack0"),
|
new Node(0, "localhost", 9092, "rack0"),
|
||||||
new Node(1, "locahost", 9093, "rack0"),
|
new Node(1, "localhost", 9093, "rack0"),
|
||||||
new Node(2, "locahost", 9094, null),
|
new Node(2, "localhost", 9094, null),
|
||||||
new Node(3, "locahost", 9095, "rack1"),
|
new Node(3, "localhost", 9095, "rack1"),
|
||||||
new Node(4, "locahost", 9096, "rack1"),
|
new Node(4, "localhost", 9096, "rack1"),
|
||||||
new Node(5, "locahost", 9097, "rack2"))).
|
new Node(5, "localhost", 9097, "rack2"))).
|
||||||
build()
|
build()
|
||||||
try {
|
try {
|
||||||
addTopics(adminClient)
|
addTopics(adminClient)
|
||||||
|
|
|
@ -114,7 +114,7 @@ class PartitionLockTest extends Logging {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifies concurrent produce and replica fetch log read result update with ISR updates. This
|
* Verifies concurrent produce and replica fetch log read result update with ISR updates. This
|
||||||
* can result in delays in processing produce and replica fetch requets since write lock is obtained,
|
* can result in delays in processing produce and replica fetch requests since write lock is obtained,
|
||||||
* but it should complete without any failures.
|
* but it should complete without any failures.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -2263,13 +2263,13 @@ class GroupCoordinatorTest {
|
||||||
|
|
||||||
assertEquals(Errors.NONE, await(leaderResult, 1).error)
|
assertEquals(Errors.NONE, await(leaderResult, 1).error)
|
||||||
|
|
||||||
// Leader should be able to heartbeart
|
// Leader should be able to heartbeat
|
||||||
verifyHeartbeat(results.head, Errors.NONE)
|
verifyHeartbeat(results.head, Errors.NONE)
|
||||||
|
|
||||||
// Advance part the rebalance timeout to trigger the delayed operation.
|
// Advance part the rebalance timeout to trigger the delayed operation.
|
||||||
timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
|
timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
|
||||||
|
|
||||||
// Leader should be able to heartbeart
|
// Leader should be able to heartbeat
|
||||||
verifyHeartbeat(results.head, Errors.REBALANCE_IN_PROGRESS)
|
verifyHeartbeat(results.head, Errors.REBALANCE_IN_PROGRESS)
|
||||||
|
|
||||||
// Followers should have been removed.
|
// Followers should have been removed.
|
||||||
|
@ -2347,7 +2347,7 @@ class GroupCoordinatorTest {
|
||||||
val followerErrors = followerResults.map(await(_, 1).error)
|
val followerErrors = followerResults.map(await(_, 1).error)
|
||||||
assertEquals(Set(Errors.NONE), followerErrors.toSet)
|
assertEquals(Set(Errors.NONE), followerErrors.toSet)
|
||||||
|
|
||||||
// Advance past the rebalance timeout to expire the Sync timout. All
|
// Advance past the rebalance timeout to expire the Sync timeout. All
|
||||||
// members should remain and the group should not rebalance.
|
// members should remain and the group should not rebalance.
|
||||||
timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
|
timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def shouldCompleteDelayedOperationWheCoordinatorEpochFenced(): Unit = {
|
def shouldCompleteDelayedOperationWhenCoordinatorEpochFenced(): Unit = {
|
||||||
mockCache()
|
mockCache()
|
||||||
|
|
||||||
verifyRemoveDelayedOperationOnError(Errors.TRANSACTION_COORDINATOR_FENCED)
|
verifyRemoveDelayedOperationOnError(Errors.TRANSACTION_COORDINATOR_FENCED)
|
||||||
|
|
|
@ -771,7 +771,7 @@ class TransactionStateManagerTest {
|
||||||
|
|
||||||
loadTransactionsForPartitions(partitionIds)
|
loadTransactionsForPartitions(partitionIds)
|
||||||
|
|
||||||
// When TransactionMetadata is intialized for the first time, it has the following
|
// When TransactionMetadata is initialized for the first time, it has the following
|
||||||
// shape. Then, the producer id and producer epoch are initialized and we try to
|
// shape. Then, the producer id and producer epoch are initialized and we try to
|
||||||
// write the change. If the write fails (e.g. under min isr), the TransactionMetadata
|
// write the change. If the write fails (e.g. under min isr), the TransactionMetadata
|
||||||
// is left at it is. If the transactional id is never reused, the TransactionMetadata
|
// is left at it is. If the transactional id is never reused, the TransactionMetadata
|
||||||
|
|
|
@ -32,7 +32,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
@ValueSource(strings = Array("zk", "kraft"))
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
def testDeaultKafkaConfig(quorum: String): Unit = {
|
def testDefaultKafkaConfig(quorum: String): Unit = {
|
||||||
assert(brokers.head.logManager.initialDefaultConfig.minInSyncReplicas == 5)
|
assert(brokers.head.logManager.initialDefaultConfig.minInSyncReplicas == 5)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,8 +81,8 @@ object OffsetMapTest {
|
||||||
val load = args(1).toDouble
|
val load = args(1).toDouble
|
||||||
val start = System.nanoTime
|
val start = System.nanoTime
|
||||||
val map = test.validateMap(size, load)
|
val map = test.validateMap(size, load)
|
||||||
val ellapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0
|
val elapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0
|
||||||
println(s"${map.size} entries in map of size ${map.slots} in $ellapsedMs ms")
|
println(s"${map.size} entries in map of size ${map.slots} in $elapsedMs ms")
|
||||||
println("Collision rate: %.1f%%".format(100*map.collisionRate))
|
println("Collision rate: %.1f%%".format(100*map.collisionRate))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ class TimeIndexTest {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
def setup(): Unit = {
|
def setup(): Unit = {
|
||||||
this.idx = new TimeIndex(nonExistantTempFile(), baseOffset, maxEntries * 12)
|
this.idx = new TimeIndex(nonExistentTempFile(), baseOffset, maxEntries * 12)
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
@ -97,7 +97,7 @@ class TimeIndexTest {
|
||||||
idx.maybeAppend(i * 10, i * 10 + baseOffset)
|
idx.maybeAppend(i * 10, i * 10 + baseOffset)
|
||||||
}
|
}
|
||||||
|
|
||||||
def nonExistantTempFile(): File = {
|
def nonExistentTempFile(): File = {
|
||||||
val file = TestUtils.tempFile()
|
val file = TestUtils.tempFile()
|
||||||
file.delete()
|
file.delete()
|
||||||
file
|
file
|
||||||
|
|
|
@ -515,7 +515,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Alternate authorizer, Remove all acls that end in 0
|
// Alternate authorizer, Remove all acls that end in 0
|
||||||
val concurrentFuctions = acls.map { case (acl, aclId) =>
|
val concurrentFunctions = acls.map { case (acl, aclId) =>
|
||||||
() => {
|
() => {
|
||||||
if (aclId % 2 == 0) {
|
if (aclId % 2 == 0) {
|
||||||
addAcls(authorizer1, Set(acl), commonResource)
|
addAcls(authorizer1, Set(acl), commonResource)
|
||||||
|
@ -532,7 +532,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
|
||||||
aclId % 10 != 0
|
aclId % 10 != 0
|
||||||
}.map(_._1).toSet
|
}.map(_._1).toSet
|
||||||
|
|
||||||
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
|
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFunctions, 30 * 1000)
|
||||||
|
|
||||||
TestUtils.waitAndVerifyAcls(expectedAcls, authorizer1, commonResource)
|
TestUtils.waitAndVerifyAcls(expectedAcls, authorizer1, commonResource)
|
||||||
TestUtils.waitAndVerifyAcls(expectedAcls, authorizer2, commonResource)
|
TestUtils.waitAndVerifyAcls(expectedAcls, authorizer2, commonResource)
|
||||||
|
@ -595,14 +595,14 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
|
||||||
val acl = new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username).toString, WildcardHost, AclOperation.ALL, ALLOW)
|
val acl = new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username).toString, WildcardHost, AclOperation.ALL, ALLOW)
|
||||||
|
|
||||||
// Alternate authorizer to keep adding and removing ZooKeeper path
|
// Alternate authorizer to keep adding and removing ZooKeeper path
|
||||||
val concurrentFuctions = (0 to 50).map { _ =>
|
val concurrentFunctions = (0 to 50).map { _ =>
|
||||||
() => {
|
() => {
|
||||||
addAcls(authorizer1, Set(acl), resource)
|
addAcls(authorizer1, Set(acl), resource)
|
||||||
removeAcls(authorizer2, Set(acl), resource)
|
removeAcls(authorizer2, Set(acl), resource)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
|
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFunctions, 30 * 1000)
|
||||||
|
|
||||||
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
|
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
|
||||||
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer2, resource)
|
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer2, resource)
|
||||||
|
|
|
@ -286,7 +286,7 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
checkUserAppearsInAlterResults(results1_1, user2)
|
checkUserAppearsInAlterResults(results1_1, user2)
|
||||||
|
|
||||||
// KRaft is eventually consistent so it is possible to call describe before
|
// KRaft is eventually consistent so it is possible to call describe before
|
||||||
// the credential is propogated from the controller to the broker.
|
// the credential is propagated from the controller to the broker.
|
||||||
TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 2,
|
TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 2,
|
||||||
"describeAllWithNoTopLevelErrorConfirmed does not see 2 users");
|
"describeAllWithNoTopLevelErrorConfirmed does not see 2 users");
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ object CreateTopicsRequestWithPolicyTest {
|
||||||
if (numPartitions != null || replicationFactor != null) {
|
if (numPartitions != null || replicationFactor != null) {
|
||||||
require(numPartitions != null, s"numPartitions should not be null, but it is $numPartitions")
|
require(numPartitions != null, s"numPartitions should not be null, but it is $numPartitions")
|
||||||
require(replicationFactor != null, s"replicationFactor should not be null, but it is $replicationFactor")
|
require(replicationFactor != null, s"replicationFactor should not be null, but it is $replicationFactor")
|
||||||
require(replicasAssignments == null, s"replicaAssigments should be null, but it is $replicasAssignments")
|
require(replicasAssignments == null, s"replicaAssignments should be null, but it is $replicasAssignments")
|
||||||
|
|
||||||
if (numPartitions < 5)
|
if (numPartitions < 5)
|
||||||
throw new PolicyViolationException(s"Topics should have at least 5 partitions, received $numPartitions")
|
throw new PolicyViolationException(s"Topics should have at least 5 partitions, received $numPartitions")
|
||||||
|
@ -173,7 +173,7 @@ object CreateTopicsRequestWithPolicyTest {
|
||||||
} else {
|
} else {
|
||||||
require(numPartitions == null, s"numPartitions should be null, but it is $numPartitions")
|
require(numPartitions == null, s"numPartitions should be null, but it is $numPartitions")
|
||||||
require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor")
|
require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor")
|
||||||
require(replicasAssignments != null, s"replicaAssigments should not be null, but it is $replicasAssignments")
|
require(replicasAssignments != null, s"replicaAssignments should not be null, but it is $replicasAssignments")
|
||||||
|
|
||||||
replicasAssignments.asScala.toSeq.sortBy { case (tp, _) => tp }.foreach { case (partitionId, assignment) =>
|
replicasAssignments.asScala.toSeq.sortBy { case (tp, _) => tp }.foreach { case (partitionId, assignment) =>
|
||||||
if (assignment.size < 2)
|
if (assignment.size < 2)
|
||||||
|
|
|
@ -59,7 +59,7 @@ class ZkAdminManagerTest {
|
||||||
assertEquals(1, oneProp.size)
|
assertEquals(1, oneProp.size)
|
||||||
assertEquals(1234.0, oneProp("foo"))
|
assertEquals(1234.0, oneProp("foo"))
|
||||||
|
|
||||||
// This is probably not desired, but kept for compatability with existing usages
|
// This is probably not desired, but kept for compatibility with existing usages
|
||||||
val emptyKey = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("" -> "-42.1"))
|
val emptyKey = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("" -> "-42.1"))
|
||||||
assertEquals(1, emptyKey.size)
|
assertEquals(1, emptyKey.size)
|
||||||
assertEquals(-42.1, emptyKey(""))
|
assertEquals(-42.1, emptyKey(""))
|
||||||
|
|
|
@ -48,9 +48,9 @@ class InMemoryLeaderEpochCheckpointTest {
|
||||||
val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset))
|
val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset))
|
||||||
checkpoint.write(epochs)
|
checkpoint.write(epochs)
|
||||||
assertEquals(epochs, checkpoint.read())
|
assertEquals(epochs, checkpoint.read())
|
||||||
val ba = checkpoint.readAsByteBuffer()
|
val buffer = checkpoint.readAsByteBuffer()
|
||||||
|
|
||||||
val bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(ba.array()), StandardCharsets.UTF_8))
|
val bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8))
|
||||||
assertEquals(expectedVersion.toString, bufferedReader.readLine())
|
assertEquals(expectedVersion.toString, bufferedReader.readLine())
|
||||||
assertEquals(epochs.size().toString, bufferedReader.readLine())
|
assertEquals(epochs.size().toString, bufferedReader.readLine())
|
||||||
assertEquals(s"$expectedEpoch $expectedStartOffset", bufferedReader.readLine())
|
assertEquals(s"$expectedEpoch $expectedStartOffset", bufferedReader.readLine())
|
||||||
|
|
|
@ -92,7 +92,7 @@ class LeaderEpochFileCacheTest {
|
||||||
cache.assign(2, 11)
|
cache.assign(2, 11)
|
||||||
cache.assign(3, 12)
|
cache.assign(3, 12)
|
||||||
|
|
||||||
//When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
|
//When (say a bootstrapping follower) sends request for UNDEFINED_EPOCH
|
||||||
val epochAndOffsetFor = toTuple(cache.endOffsetFor(UNDEFINED_EPOCH, 0L))
|
val epochAndOffsetFor = toTuple(cache.endOffsetFor(UNDEFINED_EPOCH, 0L))
|
||||||
|
|
||||||
//Then
|
//Then
|
||||||
|
|
|
@ -1188,7 +1188,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
|
||||||
|
|
||||||
assertEquals(CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
|
assertEquals(CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
|
||||||
eraseMetadata(zkClient.createControllerEpochRaw(0)),
|
eraseMetadata(zkClient.createControllerEpochRaw(0)),
|
||||||
"Attemt to create existing nodes should return NODEEXISTS")
|
"Attempt to create existing nodes should return NODEEXISTS")
|
||||||
|
|
||||||
assertEquals(SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
|
assertEquals(SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
|
||||||
eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)),
|
eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)),
|
||||||
|
@ -1414,7 +1414,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testJuteMaxBufffer(): Unit = {
|
def testJuteMaxBuffer(): Unit = {
|
||||||
|
|
||||||
def assertJuteMaxBufferConfig(clientConfig: ZKClientConfig, expectedValue: String): Unit = {
|
def assertJuteMaxBufferConfig(clientConfig: ZKClientConfig, expectedValue: String): Unit = {
|
||||||
val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
|
val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
|
||||||
|
|
|
@ -73,7 +73,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
|
||||||
val value = message.value
|
val value = message.value
|
||||||
|
|
||||||
assertTrue(props.containsKey(name))
|
assertTrue(props.containsKey(name))
|
||||||
// If the config is senstive, compare it to the decoded value.
|
// If the config is sensitive, compare it to the decoded value.
|
||||||
if (name == KafkaConfig.SslKeystorePasswordProp) {
|
if (name == KafkaConfig.SslKeystorePasswordProp) {
|
||||||
assertEquals(SECRET, value)
|
assertEquals(SECRET, value)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue