diff --git a/build.gradle b/build.gradle
index 64611770343..50912a4c991 100644
--- a/build.gradle
+++ b/build.gradle
@@ -907,6 +907,44 @@ project(':server') {
}
}
+project(':share') {
+ base {
+ archivesName = "kafka-share"
+ }
+
+ dependencies {
+ implementation project(':server-common')
+
+ implementation libs.slf4jApi
+
+ testImplementation libs.junitJupiter
+ testImplementation libs.slf4jReload4j
+
+ testRuntimeOnly libs.junitPlatformLanucher
+ }
+
+ sourceSets {
+ main {
+ java {
+ srcDirs = ["src/main/java"]
+ }
+ }
+ test {
+ java {
+ srcDirs = ["src/test/java"]
+ }
+ }
+ }
+
+ checkstyle {
+ configProperties = checkstyleConfigProperties("import-control-share.xml")
+ }
+
+ javadoc {
+ enabled = false
+ }
+}
+
project(':core') {
apply plugin: 'scala'
@@ -941,6 +979,7 @@ project(':core') {
implementation project(':storage')
implementation project(':server')
implementation project(':coordinator-common')
+ implementation project(':share')
implementation libs.argparse4j
implementation libs.commonsValidator
@@ -980,6 +1019,7 @@ project(':core') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server').sourceSets.test.output
+ testImplementation project(':share').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
diff --git a/checkstyle/import-control-share.xml b/checkstyle/import-control-share.xml
new file mode 100644
index 00000000000..1356966e4c6
--- /dev/null
+++ b/checkstyle/import-control-share.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index d2ac561f392..9141416cf48 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -16,7 +16,6 @@
*/
package kafka.server.share;
-import kafka.server.FetchSession;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@@ -41,9 +40,12 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.FinalContext;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.ShareFetchContext;
import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
+import org.apache.kafka.server.share.ShareSessionContext;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -502,7 +504,7 @@ public class SharePartitionManager implements AutoCloseable {
}
private static String partitionsToLogString(Collection partitions) {
- return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled());
+ return ShareSession.partitionsToLogString(partitions, log.isTraceEnabled());
}
/**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index be9e72dede8..07c8eb0563b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,7 +24,7 @@ import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
-import kafka.server.share.{ErroneousAndValidPartitionData, ShareFetchContext, SharePartitionManager}
+import kafka.server.share.SharePartitionManager
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.admin.AdminUtils
@@ -75,7 +75,7 @@ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.share.ShareAcknowledgementBatch
+import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareAcknowledgementBatch, ShareFetchContext}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
import java.lang.{Long => JLong}
@@ -4225,7 +4225,7 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = {
val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
- erroneousAndValidPartitionData.erroneous.forEach { erroneousData => erroneous += erroneousData }
+ erroneousAndValidPartitionData.erroneous.forEach { (topicIdPartition, partitionData) => erroneous.put(topicIdPartition, partitionData) }
val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 591b531a07a..8c80060bc0c 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -48,9 +48,13 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
+import org.apache.kafka.server.share.FinalContext;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.ShareFetchContext;
import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
+import org.apache.kafka.server.share.ShareSessionContext;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
@@ -1865,16 +1869,17 @@ public class SharePartitionManagerTest {
assertEquals(partitionsSet, partitionsInContext);
}
- private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData,
+ private void assertErroneousAndValidTopicIdPartitions(
+ ErroneousAndValidPartitionData erroneousAndValidPartitionData,
List expectedErroneous, List expectedValid) {
Set expectedErroneousSet = new HashSet<>(expectedErroneous);
Set expectedValidSet = new HashSet<>(expectedValid);
Set actualErroneousPartitions = new HashSet<>();
Set actualValidPartitions = new HashSet<>();
- erroneousAndValidPartitionData.erroneous().forEach(topicIdPartitionPartitionDataTuple2 ->
- actualErroneousPartitions.add(topicIdPartitionPartitionDataTuple2._1));
- erroneousAndValidPartitionData.validTopicIdPartitions().forEach(topicIdPartitionPartitionDataTuple2 ->
- actualValidPartitions.add(topicIdPartitionPartitionDataTuple2._1));
+ erroneousAndValidPartitionData.erroneous().forEach((topicIdPartition, partitionData) ->
+ actualErroneousPartitions.add(topicIdPartition));
+ erroneousAndValidPartitionData.validTopicIdPartitions().forEach((topicIdPartition, partitionData) ->
+ actualValidPartitions.add(topicIdPartition));
assertEquals(expectedErroneousSet, actualErroneousPartitions);
assertEquals(expectedValidSet, actualValidPartitions);
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fe720ebfd91..8018f9697a2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -25,7 +25,7 @@ import kafka.log.UnifiedLog
import kafka.network.{RequestChannel, RequestMetrics}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
-import kafka.server.share.{ErroneousAndValidPartitionData, FinalContext, SharePartitionManager, ShareSessionContext}
+import kafka.server.share.SharePartitionManager
import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -86,7 +86,7 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition, ShareAcknowledgementBatch, ShareSession, ShareSessionKey}
+import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, ShareSession, ShareSessionContext, ShareSessionKey}
import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
import org.junit.jupiter.api.Assertions._
@@ -5984,26 +5984,20 @@ class KafkaApisTest extends Logging {
).asJava)
)
- val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
+ val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
- val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
- validPartitions.add(
- (
- tp1,
- new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
- )
+ val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
+ validPartitions.put(
+ tp1,
+ new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp2,
- new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp2,
+ new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp3,
- new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp3,
+ new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
)
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
@@ -6133,30 +6127,24 @@ class KafkaApisTest extends Logging {
).asJava)
)
- val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
- erroneousPartitions.add(
- (
- tp2,
- new ShareFetchResponseData.PartitionData()
- .setPartitionIndex(1)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- )
+ val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
+ erroneousPartitions.put(
+ tp2,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
)
- erroneousPartitions.add(
- (
- tp3,
- new ShareFetchResponseData.PartitionData()
- .setPartitionIndex(0)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- )
+ erroneousPartitions.put(
+ tp3,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
)
- val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
- validPartitions.add(
- (
- tp1,
- new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
- )
+ val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
+ validPartitions.put(
+ tp1,
+ new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
)
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
@@ -6287,26 +6275,20 @@ class KafkaApisTest extends Logging {
).asJava)
)
- val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
+ val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
- val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
- validPartitions.add(
- (
- tp1,
- new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
- )
+ val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
+ validPartitions.put(
+ tp1,
+ new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp2,
- new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp2,
+ new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp3,
- new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp3,
+ new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
)
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
@@ -6452,32 +6434,24 @@ class KafkaApisTest extends Logging {
).asJava)
)
- val erroneousPartitions: util.List[(TopicIdPartition, ShareFetchResponseData.PartitionData)] = new util.ArrayList()
+ val erroneousPartitions: util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData] = new util.HashMap()
- val validPartitions: util.List[(TopicIdPartition, ShareFetchRequest.SharePartitionData)] = new util.ArrayList()
- validPartitions.add(
- (
- tp1,
- new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
- )
+ val validPartitions: util.Map[TopicIdPartition, ShareFetchRequest.SharePartitionData] = new util.HashMap()
+ validPartitions.put(
+ tp1,
+ new ShareFetchRequest.SharePartitionData(topicId1, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp2,
- new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp2,
+ new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp3,
- new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp3,
+ new ShareFetchRequest.SharePartitionData(topicId2, partitionMaxBytes)
)
- validPartitions.add(
- (
- tp4,
- new ShareFetchRequest.SharePartitionData(topicId3, partitionMaxBytes)
- )
+ validPartitions.put(
+ tp4,
+ new ShareFetchRequest.SharePartitionData(topicId3, partitionMaxBytes)
)
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData =
diff --git a/settings.gradle b/settings.gradle
index 7b6b0f933b2..6956948e18b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -68,6 +68,7 @@ include 'clients',
'raft',
'server',
'server-common',
+ 'share',
'share-coordinator',
'shell',
'storage',
diff --git a/server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java b/share/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java
similarity index 100%
rename from server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java
rename to share/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java
diff --git a/core/src/main/java/kafka/server/share/ErroneousAndValidPartitionData.java b/share/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java
similarity index 59%
rename from core/src/main/java/kafka/server/share/ErroneousAndValidPartitionData.java
rename to share/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java
index 3ba18536fc6..9918fbdf48e 100644
--- a/core/src/main/java/kafka/server/share/ErroneousAndValidPartitionData.java
+++ b/share/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.server.share;
+package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData;
@@ -23,47 +23,44 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
-import scala.Tuple2;
-
/**
* Helper class to return the erroneous partitions and valid partition data
*/
public class ErroneousAndValidPartitionData {
- private final List> erroneous;
- private final List> validTopicIdPartitions;
+ private final Map erroneous;
+ private final Map validTopicIdPartitions;
- public ErroneousAndValidPartitionData(List> erroneous,
- List> validTopicIdPartitions) {
+ public ErroneousAndValidPartitionData(Map erroneous,
+ Map validTopicIdPartitions) {
this.erroneous = erroneous;
this.validTopicIdPartitions = validTopicIdPartitions;
}
public ErroneousAndValidPartitionData(Map shareFetchData) {
- erroneous = new ArrayList<>();
- validTopicIdPartitions = new ArrayList<>();
+ erroneous = new HashMap<>();
+ validTopicIdPartitions = new HashMap<>();
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
if (topicIdPartition.topic() == null) {
- erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)));
+ erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID));
} else {
- validTopicIdPartitions.add(new Tuple2<>(topicIdPartition, sharePartitionData));
+ validTopicIdPartitions.put(topicIdPartition, sharePartitionData);
}
});
}
public ErroneousAndValidPartitionData() {
- this.erroneous = new ArrayList<>();
- this.validTopicIdPartitions = new ArrayList<>();
+ this.erroneous = new HashMap<>();
+ this.validTopicIdPartitions = new HashMap<>();
}
- public List> erroneous() {
+ public Map erroneous() {
return erroneous;
}
- public List> validTopicIdPartitions() {
+ public Map validTopicIdPartitions() {
return validTopicIdPartitions;
}
}
diff --git a/core/src/main/java/kafka/server/share/FinalContext.java b/share/src/main/java/org/apache/kafka/server/share/FinalContext.java
similarity index 98%
rename from core/src/main/java/kafka/server/share/FinalContext.java
rename to share/src/main/java/org/apache/kafka/server/share/FinalContext.java
index ec2f52f0f56..35da343b514 100644
--- a/core/src/main/java/kafka/server/share/FinalContext.java
+++ b/share/src/main/java/org/apache/kafka/server/share/FinalContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.server.share;
+package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
diff --git a/server/src/main/java/org/apache/kafka/server/share/LastUsedKey.java b/share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
similarity index 100%
rename from server/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
rename to share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java b/share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
similarity index 100%
rename from server/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
rename to share/src/main/java/org/apache/kafka/server/share/ShareAcknowledgementBatch.java
diff --git a/core/src/main/java/kafka/server/share/ShareFetchContext.java b/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
similarity index 96%
rename from core/src/main/java/kafka/server/share/ShareFetchContext.java
rename to share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
index 3075ff8a704..7714b022819 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchContext.java
+++ b/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
@@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.server.share;
-
-import kafka.server.FetchSession;
+package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
@@ -40,7 +38,7 @@ public abstract class ShareFetchContext {
* @return - A string representation of the partitions requested.
*/
String partitionsToLogString(Collection partitions) {
- return FetchSession.partitionsToLogString(partitions, isTraceEnabled());
+ return ShareSession.partitionsToLogString(partitions, isTraceEnabled());
}
/**
diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareSession.java b/share/src/main/java/org/apache/kafka/server/share/ShareSession.java
similarity index 94%
rename from server/src/main/java/org/apache/kafka/server/share/ShareSession.java
rename to share/src/main/java/org/apache/kafka/server/share/ShareSession.java
index 8b04f2e0b6c..12124f51168 100644
--- a/server/src/main/java/org/apache/kafka/server/share/ShareSession.java
+++ b/share/src/main/java/org/apache/kafka/server/share/ShareSession.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -141,6 +142,13 @@ public class ShareSession {
return result;
}
+ public static String partitionsToLogString(Collection partitions, Boolean traceEnabled) {
+ if (traceEnabled) {
+ return String.format("( %s )", String.join(", ", partitions.toString()));
+ }
+ return String.format("%s partition(s)", partitions.size());
+ }
+
public String toString() {
return "ShareSession(" +
"key=" + key +
diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java b/share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
similarity index 100%
rename from server/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
rename to share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
diff --git a/core/src/main/java/kafka/server/share/ShareSessionContext.java b/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
similarity index 94%
rename from core/src/main/java/kafka/server/share/ShareSessionContext.java
rename to share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
index e4146db040f..fee03eeb311 100644
--- a/core/src/main/java/kafka/server/share/ShareSessionContext.java
+++ b/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.server.share;
+package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@@ -27,23 +27,18 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
-import org.apache.kafka.server.share.CachedSharePartition;
-import org.apache.kafka.server.share.ShareSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import scala.Tuple2;
-
/**
* The context for a share session fetch request.
*/
@@ -229,8 +224,8 @@ public class ShareSessionContext extends ShareFetchContext {
if (!isSubsequent) {
return new ErroneousAndValidPartitionData(shareFetchData);
}
- List> erroneous = new ArrayList<>();
- List> valid = new ArrayList<>();
+ Map erroneous = new HashMap<>();
+ Map valid = new HashMap<>();
// Take the session lock and iterate over all the cached partitions.
synchronized (session) {
session.partitionMap().forEach(cachedSharePartition -> {
@@ -238,9 +233,9 @@ public class ShareSessionContext extends ShareFetchContext {
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
if (topicIdPartition.topic() == null) {
- erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)));
+ erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID));
} else {
- valid.add(new Tuple2<>(topicIdPartition, reqData));
+ valid.put(topicIdPartition, reqData);
}
});
return new ErroneousAndValidPartitionData(erroneous, valid);
diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java b/share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
similarity index 100%
rename from server/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
rename to share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
diff --git a/server/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java b/share/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java
similarity index 100%
rename from server/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java
rename to share/src/test/java/org/apache/kafka/server/share/CachedSharePartitionTest.java
diff --git a/server/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java b/share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
similarity index 100%
rename from server/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
rename to share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
diff --git a/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java b/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
new file mode 100644
index 00000000000..66879633619
--- /dev/null
+++ b/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShareSessionTest {
+
+ @Test
+ public void testPartitionsToLogString() {
+ Uuid uuid1 = Uuid.randomUuid();
+ Uuid uuid2 = Uuid.randomUuid();
+ List partitions = Arrays.asList(
+ new TopicIdPartition(uuid1, 0, "foo"),
+ new TopicIdPartition(uuid2, 1, "bar"));
+
+ String response = ShareSession.partitionsToLogString(partitions, false);
+ assertEquals("2 partition(s)", response);
+
+ response = ShareSession.partitionsToLogString(partitions, true);
+ assertEquals(String.format("( [%s:foo-0, %s:bar-1] )", uuid1, uuid2), response);
+ }
+
+ @Test
+ public void testPartitionsToLogStringEmpty() {
+ String response = ShareSession.partitionsToLogString(Collections.emptyList(), false);
+ assertEquals("0 partition(s)", response);
+
+ response = ShareSession.partitionsToLogString(Collections.emptyList(), true);
+ assertEquals("( [] )", response);
+ }
+}