mirror of https://github.com/apache/kafka.git
KAFKA-17346: Create :share Gradle module (#16888)
Establishes the new `:share` Gradle module. This module is intended to be used for server-side KIP-932 classes that are not part of the new share group coordinator. This patch relocates and renames some existing classes. A small amount of compatibility changes were also made, but do not affect any logic. Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>
This commit is contained in:
parent
e38fc50d1e
commit
3db6e68c4c
40
build.gradle
40
build.gradle
|
@ -907,6 +907,44 @@ project(':server') {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
project(':share') {
|
||||||
|
base {
|
||||||
|
archivesName = "kafka-share"
|
||||||
|
}
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
implementation project(':server-common')
|
||||||
|
|
||||||
|
implementation libs.slf4jApi
|
||||||
|
|
||||||
|
testImplementation libs.junitJupiter
|
||||||
|
testImplementation libs.slf4jReload4j
|
||||||
|
|
||||||
|
testRuntimeOnly libs.junitPlatformLanucher
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
main {
|
||||||
|
java {
|
||||||
|
srcDirs = ["src/main/java"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
test {
|
||||||
|
java {
|
||||||
|
srcDirs = ["src/test/java"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkstyle {
|
||||||
|
configProperties = checkstyleConfigProperties("import-control-share.xml")
|
||||||
|
}
|
||||||
|
|
||||||
|
javadoc {
|
||||||
|
enabled = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
project(':core') {
|
project(':core') {
|
||||||
apply plugin: 'scala'
|
apply plugin: 'scala'
|
||||||
|
|
||||||
|
@ -941,6 +979,7 @@ project(':core') {
|
||||||
implementation project(':storage')
|
implementation project(':storage')
|
||||||
implementation project(':server')
|
implementation project(':server')
|
||||||
implementation project(':coordinator-common')
|
implementation project(':coordinator-common')
|
||||||
|
implementation project(':share')
|
||||||
|
|
||||||
implementation libs.argparse4j
|
implementation libs.argparse4j
|
||||||
implementation libs.commonsValidator
|
implementation libs.commonsValidator
|
||||||
|
@ -980,6 +1019,7 @@ project(':core') {
|
||||||
testImplementation project(':server-common').sourceSets.test.output
|
testImplementation project(':server-common').sourceSets.test.output
|
||||||
testImplementation project(':storage:storage-api').sourceSets.test.output
|
testImplementation project(':storage:storage-api').sourceSets.test.output
|
||||||
testImplementation project(':server').sourceSets.test.output
|
testImplementation project(':server').sourceSets.test.output
|
||||||
|
testImplementation project(':share').sourceSets.test.output
|
||||||
testImplementation libs.bcpkix
|
testImplementation libs.bcpkix
|
||||||
testImplementation libs.mockitoCore
|
testImplementation libs.mockitoCore
|
||||||
testImplementation(libs.apacheda) {
|
testImplementation(libs.apacheda) {
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
<!DOCTYPE import-control PUBLIC
|
||||||
|
"-//Puppy Crawl//DTD Import Control 1.1//EN"
|
||||||
|
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<import-control pkg="org.apache.kafka">
|
||||||
|
|
||||||
|
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
|
||||||
|
|
||||||
|
<!-- common library dependencies -->
|
||||||
|
<allow pkg="java" />
|
||||||
|
<allow pkg="org.slf4j" />
|
||||||
|
<allow pkg="org.junit" />
|
||||||
|
|
||||||
|
<!-- no one depends on the server -->
|
||||||
|
<disallow pkg="kafka" />
|
||||||
|
|
||||||
|
<!-- anyone can use public classes -->
|
||||||
|
<allow pkg="org.apache.kafka.common" exact-match="true" />
|
||||||
|
<allow pkg="org.apache.kafka.common.utils" />
|
||||||
|
|
||||||
|
<!-- protocol, records and request/response utilities -->
|
||||||
|
<allow pkg="org.apache.kafka.common.message" />
|
||||||
|
<allow pkg="org.apache.kafka.common.protocol" />
|
||||||
|
<allow pkg="org.apache.kafka.common.requests" />
|
||||||
|
|
||||||
|
</import-control>
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package kafka.server.share;
|
package kafka.server.share;
|
||||||
|
|
||||||
import kafka.server.FetchSession;
|
|
||||||
import kafka.server.QuotaFactory;
|
import kafka.server.QuotaFactory;
|
||||||
import kafka.server.ReplicaManager;
|
import kafka.server.ReplicaManager;
|
||||||
|
|
||||||
|
@ -41,9 +40,12 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.server.group.share.Persister;
|
import org.apache.kafka.server.group.share.Persister;
|
||||||
import org.apache.kafka.server.share.CachedSharePartition;
|
import org.apache.kafka.server.share.CachedSharePartition;
|
||||||
|
import org.apache.kafka.server.share.FinalContext;
|
||||||
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
|
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
|
||||||
|
import org.apache.kafka.server.share.ShareFetchContext;
|
||||||
import org.apache.kafka.server.share.ShareSession;
|
import org.apache.kafka.server.share.ShareSession;
|
||||||
import org.apache.kafka.server.share.ShareSessionCache;
|
import org.apache.kafka.server.share.ShareSessionCache;
|
||||||
|
import org.apache.kafka.server.share.ShareSessionContext;
|
||||||
import org.apache.kafka.server.share.ShareSessionKey;
|
import org.apache.kafka.server.share.ShareSessionKey;
|
||||||
import org.apache.kafka.server.util.timer.SystemTimer;
|
import org.apache.kafka.server.util.timer.SystemTimer;
|
||||||
import org.apache.kafka.server.util.timer.SystemTimerReaper;
|
import org.apache.kafka.server.util.timer.SystemTimerReaper;
|
||||||
|
@ -502,7 +504,7 @@ public class SharePartitionManager implements AutoCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String partitionsToLogString(Collection<TopicIdPartition> partitions) {
|
private static String partitionsToLogString(Collection<TopicIdPartition> partitions) {
|
||||||
return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled());
|
return ShareSession.partitionsToLogString(partitions, log.isTraceEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,7 +24,7 @@ import kafka.network.RequestChannel
|
||||||
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
|
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
|
||||||
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
|
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
|
||||||
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
|
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
|
||||||
import kafka.server.share.{ErroneousAndValidPartitionData, ShareFetchContext, SharePartitionManager}
|
import kafka.server.share.SharePartitionManager
|
||||||
import kafka.utils.Implicits._
|
import kafka.utils.Implicits._
|
||||||
import kafka.utils.{CoreUtils, Logging}
|
import kafka.utils.{CoreUtils, Logging}
|
||||||
import org.apache.kafka.admin.AdminUtils
|
import org.apache.kafka.admin.AdminUtils
|
||||||
|
@ -75,7 +75,7 @@ import org.apache.kafka.server.authorizer._
|
||||||
import org.apache.kafka.server.common.MetadataVersion
|
import org.apache.kafka.server.common.MetadataVersion
|
||||||
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
|
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
|
||||||
import org.apache.kafka.server.record.BrokerCompressionType
|
import org.apache.kafka.server.record.BrokerCompressionType
|
||||||
import org.apache.kafka.server.share.ShareAcknowledgementBatch
|
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareAcknowledgementBatch, ShareFetchContext}
|
||||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
|
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
|
||||||
|
|
||||||
import java.lang.{Long => JLong}
|
import java.lang.{Long => JLong}
|
||||||
|
@ -4225,7 +4225,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
||||||
): CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = {
|
): CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = {
|
||||||
|
|
||||||
val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
|
val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
|
||||||
erroneousAndValidPartitionData.erroneous.forEach { erroneousData => erroneous += erroneousData }
|
erroneousAndValidPartitionData.erroneous.forEach { (topicIdPartition, partitionData) => erroneous.put(topicIdPartition, partitionData) }
|
||||||
|
|
||||||
val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
|
val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
|
||||||
|
|
||||||
|
|
|
@ -48,9 +48,13 @@ import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
|
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
|
||||||
import org.apache.kafka.server.group.share.Persister;
|
import org.apache.kafka.server.group.share.Persister;
|
||||||
import org.apache.kafka.server.share.CachedSharePartition;
|
import org.apache.kafka.server.share.CachedSharePartition;
|
||||||
|
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
|
||||||
|
import org.apache.kafka.server.share.FinalContext;
|
||||||
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
|
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
|
||||||
|
import org.apache.kafka.server.share.ShareFetchContext;
|
||||||
import org.apache.kafka.server.share.ShareSession;
|
import org.apache.kafka.server.share.ShareSession;
|
||||||
import org.apache.kafka.server.share.ShareSessionCache;
|
import org.apache.kafka.server.share.ShareSessionCache;
|
||||||
|
import org.apache.kafka.server.share.ShareSessionContext;
|
||||||
import org.apache.kafka.server.share.ShareSessionKey;
|
import org.apache.kafka.server.share.ShareSessionKey;
|
||||||
import org.apache.kafka.server.util.timer.MockTimer;
|
import org.apache.kafka.server.util.timer.MockTimer;
|
||||||
import org.apache.kafka.server.util.timer.SystemTimer;
|
import org.apache.kafka.server.util.timer.SystemTimer;
|
||||||
|
@ -1865,16 +1869,17 @@ public class SharePartitionManagerTest {
|
||||||
assertEquals(partitionsSet, partitionsInContext);
|
assertEquals(partitionsSet, partitionsInContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData,
|
private void assertErroneousAndValidTopicIdPartitions(
|
||||||
|
ErroneousAndValidPartitionData erroneousAndValidPartitionData,
|
||||||
List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
|
List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
|
||||||
Set<TopicIdPartition> expectedErroneousSet = new HashSet<>(expectedErroneous);
|
Set<TopicIdPartition> expectedErroneousSet = new HashSet<>(expectedErroneous);
|
||||||
Set<TopicIdPartition> expectedValidSet = new HashSet<>(expectedValid);
|
Set<TopicIdPartition> expectedValidSet = new HashSet<>(expectedValid);
|
||||||
Set<TopicIdPartition> actualErroneousPartitions = new HashSet<>();
|
Set<TopicIdPartition> actualErroneousPartitions = new HashSet<>();
|
||||||
Set<TopicIdPartition> actualValidPartitions = new HashSet<>();
|
Set<TopicIdPartition> actualValidPartitions = new HashSet<>();
|
||||||
erroneousAndValidPartitionData.erroneous().forEach(topicIdPartitionPartitionDataTuple2 ->
|
erroneousAndValidPartitionData.erroneous().forEach((topicIdPartition, partitionData) ->
|
||||||
actualErroneousPartitions.add(topicIdPartitionPartitionDataTuple2._1));
|
actualErroneousPartitions.add(topicIdPartition));
|
||||||
erroneousAndValidPartitionData.validTopicIdPartitions().forEach(topicIdPartitionPartitionDataTuple2 ->
|
erroneousAndValidPartitionData.validTopicIdPartitions().forEach((topicIdPartition, partitionData) ->
|
||||||
actualValidPartitions.add(topicIdPartitionPartitionDataTuple2._1));
|
actualValidPartitions.add(topicIdPartition));
|
||||||
assertEquals(expectedErroneousSet, actualErroneousPartitions);
|
assertEquals(expectedErroneousSet, actualErroneousPartitions);
|
||||||
assertEquals(expectedValidSet, actualValidPartitions);
|
assertEquals(expectedValidSet, actualValidPartitions);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ import kafka.log.UnifiedLog
|
||||||
import kafka.network.{RequestChannel, RequestMetrics}
|
import kafka.network.{RequestChannel, RequestMetrics}
|
||||||
import kafka.server.QuotaFactory.QuotaManagers
|
import kafka.server.QuotaFactory.QuotaManagers
|
||||||
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
|
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
|
||||||
import kafka.server.share.{ErroneousAndValidPartitionData, FinalContext, SharePartitionManager, ShareSessionContext}
|
import kafka.server.share.SharePartitionManager
|
||||||
import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
|
import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
|
||||||
import kafka.zk.KafkaZkClient
|
import kafka.zk.KafkaZkClient
|
||||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||||
|
@ -86,7 +86,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
|
||||||
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
|
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
|
||||||
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
|
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
|
||||||
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
|
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
|
||||||
import org.apache.kafka.server.share.{CachedSharePartition, ShareAcknowledgementBatch, ShareSession, ShareSessionKey}
|
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, ShareSession, ShareSessionContext, ShareSessionKey}
|
||||||
import org.apache.kafka.server.util.{FutureUtils, MockTime}
|
import org.apache.kafka.server.util.{FutureUtils, MockTime}
|
||||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
|
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
|
@ -5984,27 +5984,21 @@ class KafkaApisTest extends Logging {
|
||||||
).asJava)
|
).asJava)
|
||||||
)
|
)
|
||||||
|
|
||||||
val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
|
val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
|
||||||
|
|
||||||
val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
|
val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
|
||||||
validPartitions.add(
|
validPartitions.put(
|
||||||
(
|
|
||||||
tp1,
|
tp1,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp2,
|
tp2,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp3,
|
tp3,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
||||||
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
||||||
|
@ -6133,31 +6127,25 @@ class KafkaApisTest extends Logging {
|
||||||
).asJava)
|
).asJava)
|
||||||
)
|
)
|
||||||
|
|
||||||
val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
|
val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
|
||||||
erroneousPartitions.add(
|
erroneousPartitions.put(
|
||||||
(
|
|
||||||
tp2,
|
tp2,
|
||||||
new ShareFetchResponseData.PartitionData()
|
new ShareFetchResponseData.PartitionData()
|
||||||
.setPartitionIndex(1)
|
.setPartitionIndex(1)
|
||||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
|
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
|
||||||
)
|
)
|
||||||
)
|
erroneousPartitions.put(
|
||||||
erroneousPartitions.add(
|
|
||||||
(
|
|
||||||
tp3,
|
tp3,
|
||||||
new ShareFetchResponseData.PartitionData()
|
new ShareFetchResponseData.PartitionData()
|
||||||
.setPartitionIndex(0)
|
.setPartitionIndex(0)
|
||||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
|
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
|
val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
|
||||||
validPartitions.add(
|
validPartitions.put(
|
||||||
(
|
|
||||||
tp1,
|
tp1,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
||||||
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
||||||
|
@ -6287,27 +6275,21 @@ class KafkaApisTest extends Logging {
|
||||||
).asJava)
|
).asJava)
|
||||||
)
|
)
|
||||||
|
|
||||||
val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
|
val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
|
||||||
|
|
||||||
val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
|
val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
|
||||||
validPartitions.add(
|
validPartitions.put(
|
||||||
(
|
|
||||||
tp1,
|
tp1,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp2,
|
tp2,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp3,
|
tp3,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
||||||
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
||||||
|
@ -6452,33 +6434,25 @@ class KafkaApisTest extends Logging {
|
||||||
).asJava)
|
).asJava)
|
||||||
)
|
)
|
||||||
|
|
||||||
val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
|
val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
|
||||||
|
|
||||||
val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
|
val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
|
||||||
validPartitions.add(
|
validPartitions.put(
|
||||||
(
|
|
||||||
tp1,
|
tp1,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp2,
|
tp2,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp3,
|
tp3,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
validPartitions.put(
|
||||||
validPartitions.add(
|
|
||||||
(
|
|
||||||
tp4,
|
tp4,
|
||||||
new ShareFetchRequest.SharePartitionData(topicId3, partitionMaxBytes)
|
new ShareFetchRequest.SharePartitionData(topicId3, partitionMaxBytes)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
|
||||||
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
new ErroneousAndValidPartitionData(erroneousPartitions, validPartitions)
|
||||||
|
|
|
@ -68,6 +68,7 @@ include 'clients',
|
||||||
'raft',
|
'raft',
|
||||||
'server',
|
'server',
|
||||||
'server-common',
|
'server-common',
|
||||||
|
'share',
|
||||||
'share-coordinator',
|
'share-coordinator',
|
||||||
'shell',
|
'shell',
|
||||||
'storage',
|
'storage',
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package kafka.server.share;
|
package org.apache.kafka.server.share;
|
||||||
|
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.message.ShareFetchResponseData;
|
import org.apache.kafka.common.message.ShareFetchResponseData;
|
||||||
|
@ -23,47 +23,44 @@ import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.requests.ShareFetchRequest;
|
import org.apache.kafka.common.requests.ShareFetchRequest;
|
||||||
import org.apache.kafka.common.requests.ShareFetchResponse;
|
import org.apache.kafka.common.requests.ShareFetchResponse;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to return the erroneous partitions and valid partition data
|
* Helper class to return the erroneous partitions and valid partition data
|
||||||
*/
|
*/
|
||||||
public class ErroneousAndValidPartitionData {
|
public class ErroneousAndValidPartitionData {
|
||||||
private final List<Tuple2<TopicIdPartition, ShareFetchResponseData.PartitionData>> erroneous;
|
private final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous;
|
||||||
private final List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> validTopicIdPartitions;
|
private final Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions;
|
||||||
|
|
||||||
public ErroneousAndValidPartitionData(List<Tuple2<TopicIdPartition, ShareFetchResponseData.PartitionData>> erroneous,
|
public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous,
|
||||||
List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> validTopicIdPartitions) {
|
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions) {
|
||||||
this.erroneous = erroneous;
|
this.erroneous = erroneous;
|
||||||
this.validTopicIdPartitions = validTopicIdPartitions;
|
this.validTopicIdPartitions = validTopicIdPartitions;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) {
|
public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) {
|
||||||
erroneous = new ArrayList<>();
|
erroneous = new HashMap<>();
|
||||||
validTopicIdPartitions = new ArrayList<>();
|
validTopicIdPartitions = new HashMap<>();
|
||||||
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
|
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
|
||||||
if (topicIdPartition.topic() == null) {
|
if (topicIdPartition.topic() == null) {
|
||||||
erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)));
|
erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID));
|
||||||
} else {
|
} else {
|
||||||
validTopicIdPartitions.add(new Tuple2<>(topicIdPartition, sharePartitionData));
|
validTopicIdPartitions.put(topicIdPartition, sharePartitionData);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public ErroneousAndValidPartitionData() {
|
public ErroneousAndValidPartitionData() {
|
||||||
this.erroneous = new ArrayList<>();
|
this.erroneous = new HashMap<>();
|
||||||
this.validTopicIdPartitions = new ArrayList<>();
|
this.validTopicIdPartitions = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Tuple2<TopicIdPartition, ShareFetchResponseData.PartitionData>> erroneous() {
|
public Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous() {
|
||||||
return erroneous;
|
return erroneous;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> validTopicIdPartitions() {
|
public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions() {
|
||||||
return validTopicIdPartitions;
|
return validTopicIdPartitions;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -15,7 +15,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package kafka.server.share;
|
package org.apache.kafka.server.share;
|
||||||
|
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
|
@ -14,9 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package kafka.server.share;
|
package org.apache.kafka.server.share;
|
||||||
|
|
||||||
import kafka.server.FetchSession;
|
|
||||||
|
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
@ -40,7 +38,7 @@ public abstract class ShareFetchContext {
|
||||||
* @return - A string representation of the partitions requested.
|
* @return - A string representation of the partitions requested.
|
||||||
*/
|
*/
|
||||||
String partitionsToLogString(Collection<TopicIdPartition> partitions) {
|
String partitionsToLogString(Collection<TopicIdPartition> partitions) {
|
||||||
return FetchSession.partitionsToLogString(partitions, isTraceEnabled());
|
return ShareSession.partitionsToLogString(partitions, isTraceEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
|
@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
|
||||||
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
|
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -141,6 +142,13 @@ public class ShareSession {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String partitionsToLogString(Collection<TopicIdPartition> partitions, Boolean traceEnabled) {
|
||||||
|
if (traceEnabled) {
|
||||||
|
return String.format("( %s )", String.join(", ", partitions.toString()));
|
||||||
|
}
|
||||||
|
return String.format("%s partition(s)", partitions.size());
|
||||||
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ShareSession(" +
|
return "ShareSession(" +
|
||||||
"key=" + key +
|
"key=" + key +
|
|
@ -15,7 +15,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package kafka.server.share;
|
package org.apache.kafka.server.share;
|
||||||
|
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
@ -27,23 +27,18 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
|
||||||
import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
|
import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
|
||||||
import org.apache.kafka.common.requests.ShareFetchResponse;
|
import org.apache.kafka.common.requests.ShareFetchResponse;
|
||||||
import org.apache.kafka.common.requests.ShareRequestMetadata;
|
import org.apache.kafka.common.requests.ShareRequestMetadata;
|
||||||
import org.apache.kafka.server.share.CachedSharePartition;
|
|
||||||
import org.apache.kafka.server.share.ShareSession;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The context for a share session fetch request.
|
* The context for a share session fetch request.
|
||||||
*/
|
*/
|
||||||
|
@ -229,8 +224,8 @@ public class ShareSessionContext extends ShareFetchContext {
|
||||||
if (!isSubsequent) {
|
if (!isSubsequent) {
|
||||||
return new ErroneousAndValidPartitionData(shareFetchData);
|
return new ErroneousAndValidPartitionData(shareFetchData);
|
||||||
}
|
}
|
||||||
List<Tuple2<TopicIdPartition, PartitionData>> erroneous = new ArrayList<>();
|
Map<TopicIdPartition, PartitionData> erroneous = new HashMap<>();
|
||||||
List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> valid = new ArrayList<>();
|
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> valid = new HashMap<>();
|
||||||
// Take the session lock and iterate over all the cached partitions.
|
// Take the session lock and iterate over all the cached partitions.
|
||||||
synchronized (session) {
|
synchronized (session) {
|
||||||
session.partitionMap().forEach(cachedSharePartition -> {
|
session.partitionMap().forEach(cachedSharePartition -> {
|
||||||
|
@ -238,9 +233,9 @@ public class ShareSessionContext extends ShareFetchContext {
|
||||||
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
|
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
|
||||||
ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
|
ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
|
||||||
if (topicIdPartition.topic() == null) {
|
if (topicIdPartition.topic() == null) {
|
||||||
erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)));
|
erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID));
|
||||||
} else {
|
} else {
|
||||||
valid.add(new Tuple2<>(topicIdPartition, reqData));
|
valid.put(topicIdPartition, reqData);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return new ErroneousAndValidPartitionData(erroneous, valid);
|
return new ErroneousAndValidPartitionData(erroneous, valid);
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.server.share;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
|
import org.apache.kafka.common.Uuid;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
public class ShareSessionTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionsToLogString() {
|
||||||
|
Uuid uuid1 = Uuid.randomUuid();
|
||||||
|
Uuid uuid2 = Uuid.randomUuid();
|
||||||
|
List<TopicIdPartition> partitions = Arrays.asList(
|
||||||
|
new TopicIdPartition(uuid1, 0, "foo"),
|
||||||
|
new TopicIdPartition(uuid2, 1, "bar"));
|
||||||
|
|
||||||
|
String response = ShareSession.partitionsToLogString(partitions, false);
|
||||||
|
assertEquals("2 partition(s)", response);
|
||||||
|
|
||||||
|
response = ShareSession.partitionsToLogString(partitions, true);
|
||||||
|
assertEquals(String.format("( [%s:foo-0, %s:bar-1] )", uuid1, uuid2), response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionsToLogStringEmpty() {
|
||||||
|
String response = ShareSession.partitionsToLogString(Collections.emptyList(), false);
|
||||||
|
assertEquals("0 partition(s)", response);
|
||||||
|
|
||||||
|
response = ShareSession.partitionsToLogString(Collections.emptyList(), true);
|
||||||
|
assertEquals("( [] )", response);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue