From 3db6e68c4c5e5071de938cfeba7d8ea4e7d81328 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Mon, 19 Aug 2024 21:13:08 +0100 Subject: [PATCH] KAFKA-17346: Create :share Gradle module (#16888) Establishes the new `:share` Gradle module. This module is intended to be used for server-side KIP-932 classes that are not part of the new share group coordinator. This patch relocates and renames some existing classes. A small amount of compatibility changes were also made, but do not affect any logic. Reviewers: Andrew Schofield , David Arthur --- build.gradle | 40 ++++++ checkstyle/import-control-share.xml | 42 ++++++ .../server/share/SharePartitionManager.java | 6 +- .../main/scala/kafka/server/KafkaApis.scala | 6 +- .../share/SharePartitionManagerTest.java | 15 +- .../unit/kafka/server/KafkaApisTest.scala | 132 +++++++----------- settings.gradle | 1 + .../server/share/CachedSharePartition.java | 0 .../share/ErroneousAndValidPartitionData.java | 31 ++-- .../kafka/server/share/FinalContext.java | 2 +- .../kafka/server/share/LastUsedKey.java | 0 .../share/ShareAcknowledgementBatch.java | 0 .../kafka/server/share/ShareFetchContext.java | 6 +- .../kafka/server/share/ShareSession.java | 8 ++ .../kafka/server/share/ShareSessionCache.java | 0 .../server/share/ShareSessionContext.java | 17 +-- .../kafka/server/share/ShareSessionKey.java | 0 .../share/CachedSharePartitionTest.java | 0 .../server/share/ShareSessionCacheTest.java | 0 .../kafka/server/share/ShareSessionTest.java | 55 ++++++++ 20 files changed, 239 insertions(+), 122 deletions(-) create mode 100644 checkstyle/import-control-share.xml rename {server => share}/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java (100%) rename {core/src/main/java => share/src/main/java/org/apache}/kafka/server/share/ErroneousAndValidPartitionData.java (59%) rename {core/src/main/java => share/src/main/java/org/apache}/kafka/server/share/FinalContext.java (98%) rename {server => share}/src/main/java/org/apache/kafka/server/share/LastUsedKey.java (100%) rename {server => share}/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java (100%) rename {core/src/main/java => share/src/main/java/org/apache}/kafka/server/share/ShareFetchContext.java (96%) rename {server => share}/src/main/java/org/apache/kafka/server/share/ShareSession.java (94%) rename {server => share}/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java (100%) rename {core/src/main/java => share/src/main/java/org/apache}/kafka/server/share/ShareSessionContext.java (94%) rename {server => share}/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java (100%) rename {server => share}/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java (100%) rename {server => share}/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java (100%) create mode 100644 share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java diff --git a/build.gradle b/build.gradle index 64611770343..50912a4c991 100644 --- a/build.gradle +++ b/build.gradle @@ -907,6 +907,44 @@ project(':server') { } } +project(':share') { + base { + archivesName = "kafka-share" + } + + dependencies { + implementation project(':server-common') + + implementation libs.slf4jApi + + testImplementation libs.junitJupiter + testImplementation libs.slf4jReload4j + + testRuntimeOnly libs.junitPlatformLanucher + } + + sourceSets { + main { + java { + srcDirs = ["src/main/java"] + } + } + test { + java { + srcDirs = ["src/test/java"] + } + } + } + + checkstyle { + configProperties = checkstyleConfigProperties("import-control-share.xml") + } + + javadoc { + enabled = false + } +} + project(':core') { apply plugin: 'scala' @@ -941,6 +979,7 @@ project(':core') { implementation project(':storage') implementation project(':server') implementation project(':coordinator-common') + implementation project(':share') implementation libs.argparse4j implementation libs.commonsValidator @@ -980,6 +1019,7 @@ project(':core') { testImplementation project(':server-common').sourceSets.test.output testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation project(':server').sourceSets.test.output + testImplementation project(':share').sourceSets.test.output testImplementation libs.bcpkix testImplementation libs.mockitoCore testImplementation(libs.apacheda) { diff --git a/checkstyle/import-control-share.xml b/checkstyle/import-control-share.xml new file mode 100644 index 00000000000..1356966e4c6 --- /dev/null +++ b/checkstyle/import-control-share.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index d2ac561f392..9141416cf48 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.FetchSession; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -41,9 +40,12 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.group.share.Persister; import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.FinalContext; import org.apache.kafka.server.share.ShareAcknowledgementBatch; +import org.apache.kafka.server.share.ShareFetchContext; import org.apache.kafka.server.share.ShareSession; import org.apache.kafka.server.share.ShareSessionCache; +import org.apache.kafka.server.share.ShareSessionContext; import org.apache.kafka.server.share.ShareSessionKey; import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; @@ -502,7 +504,7 @@ public class SharePartitionManager implements AutoCloseable { } private static String partitionsToLogString(Collection partitions) { - return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); + return ShareSession.partitionsToLogString(partitions, log.isTraceEnabled()); } /** diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index be9e72dede8..07c8eb0563b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -24,7 +24,7 @@ import kafka.network.RequestChannel import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.handlers.DescribeTopicPartitionsRequestHandler import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache} -import kafka.server.share.{ErroneousAndValidPartitionData, ShareFetchContext, SharePartitionManager} +import kafka.server.share.SharePartitionManager import kafka.utils.Implicits._ import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.admin.AdminUtils @@ -75,7 +75,7 @@ import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} import org.apache.kafka.server.record.BrokerCompressionType -import org.apache.kafka.server.share.ShareAcknowledgementBatch +import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareAcknowledgementBatch, ShareFetchContext} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData} import java.lang.{Long => JLong} @@ -4225,7 +4225,7 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = { val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData] - erroneousAndValidPartitionData.erroneous.forEach { erroneousData => erroneous += erroneousData } + erroneousAndValidPartitionData.erroneous.forEach { (topicIdPartition, partitionData) => erroneous.put(topicIdPartition, partitionData) } val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 591b531a07a..8c80060bc0c 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -48,9 +48,13 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.group.share.NoOpShareStatePersister; import org.apache.kafka.server.group.share.Persister; import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.ErroneousAndValidPartitionData; +import org.apache.kafka.server.share.FinalContext; import org.apache.kafka.server.share.ShareAcknowledgementBatch; +import org.apache.kafka.server.share.ShareFetchContext; import org.apache.kafka.server.share.ShareSession; import org.apache.kafka.server.share.ShareSessionCache; +import org.apache.kafka.server.share.ShareSessionContext; import org.apache.kafka.server.share.ShareSessionKey; import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.server.util.timer.SystemTimer; @@ -1865,16 +1869,17 @@ public class SharePartitionManagerTest { assertEquals(partitionsSet, partitionsInContext); } - private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData, + private void assertErroneousAndValidTopicIdPartitions( + ErroneousAndValidPartitionData erroneousAndValidPartitionData, List expectedErroneous, List expectedValid) { Set expectedErroneousSet = new HashSet<>(expectedErroneous); Set expectedValidSet = new HashSet<>(expectedValid); Set actualErroneousPartitions = new HashSet<>(); Set actualValidPartitions = new HashSet<>(); - erroneousAndValidPartitionData.erroneous().forEach(topicIdPartitionPartitionDataTuple2 -> - actualErroneousPartitions.add(topicIdPartitionPartitionDataTuple2._1)); - erroneousAndValidPartitionData.validTopicIdPartitions().forEach(topicIdPartitionPartitionDataTuple2 -> - actualValidPartitions.add(topicIdPartitionPartitionDataTuple2._1)); + erroneousAndValidPartitionData.erroneous().forEach((topicIdPartition, partitionData) -> + actualErroneousPartitions.add(topicIdPartition)); + erroneousAndValidPartitionData.validTopicIdPartitions().forEach((topicIdPartition, partitionData) -> + actualValidPartitions.add(topicIdPartition)); assertEquals(expectedErroneousSet, actualErroneousPartitions); assertEquals(expectedValidSet, actualValidPartitions); } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index fe720ebfd91..8018f9697a2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -25,7 +25,7 @@ import kafka.log.UnifiedLog import kafka.network.{RequestChannel, RequestMetrics} import kafka.server.QuotaFactory.QuotaManagers import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache} -import kafka.server.share.{ErroneousAndValidPartitionData, FinalContext, SharePartitionManager, ShareSessionContext} +import kafka.server.share.SharePartitionManager import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.admin.AlterConfigOp.OpType @@ -86,7 +86,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig} import org.apache.kafka.server.metrics.ClientMetricsTestUtils -import org.apache.kafka.server.share.{CachedSharePartition, ShareAcknowledgementBatch, ShareSession, ShareSessionKey} +import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, ShareSession, ShareSessionContext, ShareSessionKey} import org.apache.kafka.server.util.{FutureUtils, MockTime} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig} import org.junit.jupiter.api.Assertions._ @@ -5984,26 +5984,20 @@ class KafkaApisTest extends Logging { ).asJava) ) - val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList() + val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap() - val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList() - validPartitions.add( - ( - tp1, - new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) - ) + val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap() + validPartitions.put( + tp1, + new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) ) - validPartitions.add( - ( - tp2, - new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) - ) + validPartitions.put( + tp2, + new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) ) - validPartitions.add( - ( - tp3, - new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) - ) + validPartitions.put( + tp3, + new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) ) val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = @@ -6133,30 +6127,24 @@ class KafkaApisTest extends Logging { ).asJava) ) - val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList() - erroneousPartitions.add( - ( - tp2, - new ShareFetchResponseData.PartitionData() - .setPartitionIndex(1) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - ) + val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap() + erroneousPartitions.put( + tp2, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) ) - erroneousPartitions.add( - ( - tp3, - new ShareFetchResponseData.PartitionData() - .setPartitionIndex(0) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - ) + erroneousPartitions.put( + tp3, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) ) - val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList() - validPartitions.add( - ( - tp1, - new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) - ) + val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap() + validPartitions.put( + tp1, + new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) ) val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = @@ -6287,26 +6275,20 @@ class KafkaApisTest extends Logging { ).asJava) ) - val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList() + val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap() - val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList() - validPartitions.add( - ( - tp1, - new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) - ) + val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap() + validPartitions.put( + tp1, + new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) ) - validPartitions.add( - ( - tp2, - new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) - ) + validPartitions.put( + tp2, + new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) ) - validPartitions.add( - ( - tp3, - new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) - ) + validPartitions.put( + tp3, + new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) ) val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = @@ -6452,32 +6434,24 @@ class KafkaApisTest extends Logging { ).asJava) ) - val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList() + val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap() - val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList() - validPartitions.add( - ( - tp1, - new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) - ) + val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap() + validPartitions.put( + tp1, + new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes) ) - validPartitions.add( - ( - tp2, - new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) - ) + validPartitions.put( + tp2, + new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) ) - validPartitions.add( - ( - tp3, - new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) - ) + validPartitions.put( + tp3, + new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes) ) - validPartitions.add( - ( - tp4, - new ShareFetchRequest.SharePartitionData(topicId3, partitionMaxBytes) - ) + validPartitions.put( + tp4, + new ShareFetchRequest.SharePartitionData(topicId3, partitionMaxBytes) ) val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = diff --git a/settings.gradle b/settings.gradle index 7b6b0f933b2..6956948e18b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -68,6 +68,7 @@ include 'clients', 'raft', 'server', 'server-common', + 'share', 'share-coordinator', 'shell', 'storage', diff --git a/server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java b/share/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java rename to share/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java diff --git a/core/src/main/java/kafka/server/share/ErroneousAndValidPartitionData.java b/share/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java similarity index 59% rename from core/src/main/java/kafka/server/share/ErroneousAndValidPartitionData.java rename to share/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java index 3ba18536fc6..9918fbdf48e 100644 --- a/core/src/main/java/kafka/server/share/ErroneousAndValidPartitionData.java +++ b/share/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server.share; +package org.apache.kafka.server.share; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.message.ShareFetchResponseData; @@ -23,47 +23,44 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import java.util.Map; -import scala.Tuple2; - /** * Helper class to return the erroneous partitions and valid partition data */ public class ErroneousAndValidPartitionData { - private final List> erroneous; - private final List> validTopicIdPartitions; + private final Map erroneous; + private final Map validTopicIdPartitions; - public ErroneousAndValidPartitionData(List> erroneous, - List> validTopicIdPartitions) { + public ErroneousAndValidPartitionData(Map erroneous, + Map validTopicIdPartitions) { this.erroneous = erroneous; this.validTopicIdPartitions = validTopicIdPartitions; } public ErroneousAndValidPartitionData(Map shareFetchData) { - erroneous = new ArrayList<>(); - validTopicIdPartitions = new ArrayList<>(); + erroneous = new HashMap<>(); + validTopicIdPartitions = new HashMap<>(); shareFetchData.forEach((topicIdPartition, sharePartitionData) -> { if (topicIdPartition.topic() == null) { - erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID))); + erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)); } else { - validTopicIdPartitions.add(new Tuple2<>(topicIdPartition, sharePartitionData)); + validTopicIdPartitions.put(topicIdPartition, sharePartitionData); } }); } public ErroneousAndValidPartitionData() { - this.erroneous = new ArrayList<>(); - this.validTopicIdPartitions = new ArrayList<>(); + this.erroneous = new HashMap<>(); + this.validTopicIdPartitions = new HashMap<>(); } - public List> erroneous() { + public Map erroneous() { return erroneous; } - public List> validTopicIdPartitions() { + public Map validTopicIdPartitions() { return validTopicIdPartitions; } } diff --git a/core/src/main/java/kafka/server/share/FinalContext.java b/share/src/main/java/org/apache/kafka/server/share/FinalContext.java similarity index 98% rename from core/src/main/java/kafka/server/share/FinalContext.java rename to share/src/main/java/org/apache/kafka/server/share/FinalContext.java index ec2f52f0f56..35da343b514 100644 --- a/core/src/main/java/kafka/server/share/FinalContext.java +++ b/share/src/main/java/org/apache/kafka/server/share/FinalContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server.share; +package org.apache.kafka.server.share; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; diff --git a/server/src/main/java/org/apache/kafka/server/share/LastUsedKey.java b/share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/share/LastUsedKey.java rename to share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java b/share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java rename to share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java diff --git a/core/src/main/java/kafka/server/share/ShareFetchContext.java b/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java similarity index 96% rename from core/src/main/java/kafka/server/share/ShareFetchContext.java rename to share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java index 3075ff8a704..7714b022819 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchContext.java +++ b/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java @@ -14,9 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server.share; - -import kafka.server.FetchSession; +package org.apache.kafka.server.share; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; @@ -40,7 +38,7 @@ public abstract class ShareFetchContext { * @return - A string representation of the partitions requested. */ String partitionsToLogString(Collection partitions) { - return FetchSession.partitionsToLogString(partitions, isTraceEnabled()); + return ShareSession.partitionsToLogString(partitions, isTraceEnabled()); } /** diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareSession.java b/share/src/main/java/org/apache/kafka/server/share/ShareSession.java similarity index 94% rename from server/src/main/java/org/apache/kafka/server/share/ShareSession.java rename to share/src/main/java/org/apache/kafka/server/share/ShareSession.java index 8b04f2e0b6c..12124f51168 100644 --- a/server/src/main/java/org/apache/kafka/server/share/ShareSession.java +++ b/share/src/main/java/org/apache/kafka/server/share/ShareSession.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -141,6 +142,13 @@ public class ShareSession { return result; } + public static String partitionsToLogString(Collection partitions, Boolean traceEnabled) { + if (traceEnabled) { + return String.format("( %s )", String.join(", ", partitions.toString())); + } + return String.format("%s partition(s)", partitions.size()); + } + public String toString() { return "ShareSession(" + "key=" + key + diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java b/share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java rename to share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java diff --git a/core/src/main/java/kafka/server/share/ShareSessionContext.java b/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java similarity index 94% rename from core/src/main/java/kafka/server/share/ShareSessionContext.java rename to share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java index e4146db040f..fee03eeb311 100644 --- a/core/src/main/java/kafka/server/share/ShareSessionContext.java +++ b/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server.share; +package org.apache.kafka.server.share; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -27,23 +27,18 @@ import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData; import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.common.requests.ShareRequestMetadata; -import org.apache.kafka.server.share.CachedSharePartition; -import org.apache.kafka.server.share.ShareSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import scala.Tuple2; - /** * The context for a share session fetch request. */ @@ -229,8 +224,8 @@ public class ShareSessionContext extends ShareFetchContext { if (!isSubsequent) { return new ErroneousAndValidPartitionData(shareFetchData); } - List> erroneous = new ArrayList<>(); - List> valid = new ArrayList<>(); + Map erroneous = new HashMap<>(); + Map valid = new HashMap<>(); // Take the session lock and iterate over all the cached partitions. synchronized (session) { session.partitionMap().forEach(cachedSharePartition -> { @@ -238,9 +233,9 @@ public class ShareSessionContext extends ShareFetchContext { TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())); ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData(); if (topicIdPartition.topic() == null) { - erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID))); + erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)); } else { - valid.add(new Tuple2<>(topicIdPartition, reqData)); + valid.put(topicIdPartition, reqData); } }); return new ErroneousAndValidPartitionData(erroneous, valid); diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java b/share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java rename to share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java diff --git a/server/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java b/share/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java similarity index 100% rename from server/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java rename to share/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java diff --git a/server/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java b/share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java similarity index 100% rename from server/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java rename to share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java diff --git a/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java b/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java new file mode 100644 index 00000000000..66879633619 --- /dev/null +++ b/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ShareSessionTest { + + @Test + public void testPartitionsToLogString() { + Uuid uuid1 = Uuid.randomUuid(); + Uuid uuid2 = Uuid.randomUuid(); + List partitions = Arrays.asList( + new TopicIdPartition(uuid1, 0, "foo"), + new TopicIdPartition(uuid2, 1, "bar")); + + String response = ShareSession.partitionsToLogString(partitions, false); + assertEquals("2 partition(s)", response); + + response = ShareSession.partitionsToLogString(partitions, true); + assertEquals(String.format("( [%s:foo-0, %s:bar-1] )", uuid1, uuid2), response); + } + + @Test + public void testPartitionsToLogStringEmpty() { + String response = ShareSession.partitionsToLogString(Collections.emptyList(), false); + assertEquals("0 partition(s)", response); + + response = ShareSession.partitionsToLogString(Collections.emptyList(), true); + assertEquals("( [] )", response); + } +}