KAFKA-17287: Added integration tests for ShareConsumer (#17009)

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>
This commit is contained in:
ShivsundarR 2024-10-08 10:37:16 -04:00 committed by GitHub
parent 3815339e05
commit 374663c64b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1698 additions and 41 deletions

View File

@ -111,6 +111,7 @@
<allow pkg="kafka.zk" />
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="org.apache.kafka.clients.consumer"/>
<allow pkg="org.apache.kafka.clients.producer"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.coordinator.transaction"/>
<subpackage name="server">

File diff suppressed because it is too large Load Diff

View File

@ -19,9 +19,10 @@ package kafka.api
import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@ -152,4 +153,41 @@ object BaseConsumerTest {
override def onUpdate(clusterResource: ClusterResource): Unit = updateConsumerCount.incrementAndGet()
override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = data
}
class SerializerImpl extends Serializer[Array[Byte]] {
var serializer = new ByteArraySerializer()
override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
headers.add("content-type", "application/octet-stream".getBytes)
serializer.serialize(topic, data)
}
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey)
override def close(): Unit = serializer.close()
override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
fail("method should not be invoked")
null
}
}
class DeserializerImpl extends Deserializer[Array[Byte]] {
var deserializer = new ByteArrayDeserializer()
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
val header = headers.lastHeader("content-type")
assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
deserializer.deserialize(topic, data)
}
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs, isKey)
override def close(): Unit = deserializer.close()
override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
fail("method should not be invoked")
null
}
}
}

View File

@ -12,6 +12,8 @@
*/
package kafka.api
import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl}
import java.time.Duration
import java.util
import java.util.Arrays.asList
@ -23,7 +25,6 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException, WakeupException}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.{MetricName, TopicPartition}
@ -68,43 +69,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
trait SerializerImpl extends Serializer[Array[Byte]]{
var serializer = new ByteArraySerializer()
override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
headers.add("content-type", "application/octet-stream".getBytes)
serializer.serialize(topic, data)
}
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey)
override def close(): Unit = serializer.close()
override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
fail("method should not be invoked")
null
}
}
trait DeserializerImpl extends Deserializer[Array[Byte]]{
var deserializer = new ByteArrayDeserializer()
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
val header = headers.lastHeader("content-type")
assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
deserializer.deserialize(topic, data)
}
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs, isKey)
override def close(): Unit = deserializer.close()
override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
fail("method should not be invoked")
null
}
}
private def testHeadersSerializeDeserialize(serializer: Serializer[Array[Byte]], deserializer: Deserializer[Array[Byte]]): Unit = {
val numRecords = 1
val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
@ -130,9 +94,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testHeadersSerializerDeserializer(quorum: String, groupProtocol: String): Unit = {
val extendedSerializer = new Serializer[Array[Byte]] with SerializerImpl
val extendedSerializer = new SerializerImpl
val extendedDeserializer = new Deserializer[Array[Byte]] with DeserializerImpl
val extendedDeserializer = new DeserializerImpl
testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
}