KAFKA-19471: Enable acknowledgement for a record which could not be deserialized (#20148)
CI / build (push) Waiting to run Details

This patch mainly includes two improvements:

1. Update currentFetch when `pollForFetches()` throws an exception.
2. Add an override `KafkaShareConsumer.acknowledge(String topic, int
partition, long offset, AcknowledgeType type)` .

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Lan Ding 2025-07-28 05:35:04 +08:00 committed by GitHub
parent 1a176beff1
commit abbb6b3c13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 340 additions and 23 deletions

View File

@ -41,6 +41,8 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@ -67,6 +69,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -843,6 +846,144 @@ public class ShareConsumerTest {
}
}
@ClusterTest
public void testExplicitOverrideAcknowledgeCorruptedMessage() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
null,
mockErrorDeserializer(3))) {
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(2, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer.acknowledge(firstRecord);
shareConsumer.acknowledge(secondRecord);
RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(2, rde.offset());
shareConsumer.commitSync();
// The corrupted record was automatically released, so we can still obtain it.
rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(2, rde.offset());
// Reject this record
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
shareConsumer.commitSync();
records = shareConsumer.poll(Duration.ZERO);
assertEquals(0, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
}
@ClusterTest
public void testExplicitAcknowledgeOffsetThrowsNotException() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
assertEquals(0L, consumedRecord.offset());
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT));
shareConsumer.acknowledge(consumedRecord);
verifyShareGroupStateTopicRecordsProduced();
}
}
@ClusterTest
public void testExplicitAcknowledgeOffsetThrowsParametersError() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
null,
mockErrorDeserializer(2))) {
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(1, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
shareConsumer.acknowledge(firstRecord);
final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(1, rde.offset());
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT));
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT));
// Reject this record.
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
shareConsumer.commitSync();
// The next acknowledge() should throw an IllegalStateException as the record has been acked.
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
records = shareConsumer.poll(Duration.ZERO);
assertEquals(0, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
}
private ByteArrayDeserializer mockErrorDeserializer(int recordNumber) {
int recordIndex = recordNumber - 1;
return new ByteArrayDeserializer() {
int i = 0;
@Override
public byte[] deserialize(String topic, Headers headers, ByteBuffer data) {
if (i == recordIndex) {
throw new SerializationException();
} else {
i++;
return super.deserialize(topic, headers, data);
}
}
};
}
@ClusterTest
public void testImplicitAcknowledgeFailsExplicit() {
alterShareAutoOffsetReset("group1", "earliest");
@ -2794,13 +2935,22 @@ public class ShareConsumerTest {
private <K, V> ShareConsumer<K, V> createShareConsumer(
String groupId,
Map<?, ?> additionalProperties
) {
return createShareConsumer(groupId, additionalProperties, null, null);
}
private <K, V> ShareConsumer<K, V> createShareConsumer(
String groupId,
Map<?, ?> additionalProperties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer
) {
Properties props = new Properties();
props.putAll(additionalProperties);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Map<String, Object> conf = new HashMap<>();
props.forEach((k, v) -> conf.put((String) k, v));
return cluster.shareConsumer(conf);
return cluster.shareConsumer(conf, keyDeserializer, valueDeserializer);
}
private void warmup() throws InterruptedException {

View File

@ -507,6 +507,28 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
delegate.acknowledge(record, type);
}
/**
* Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
* {@link #commitAsync()} or {@link #poll(Duration)} call.
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
* <p>It provides an alternative to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for
* situations where the {@link ConsumerRecord} is not available, such as when the record could not be deserialized.
*
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record to acknowledge
* @param offset The offset of the record to acknowledge
* @param type The acknowledge type which indicates whether it was processed successfully
*
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
* explicit acknowledgement
*/
@Override
public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
delegate.acknowledge(topic, partition, offset, type);
}
/**
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or

View File

@ -104,6 +104,10 @@ public class MockShareConsumer<K, V> implements ShareConsumer<K, V> {
public synchronized void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
}
@Override
public synchronized void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
}
@Override
public synchronized Map<TopicIdPartition, Optional<KafkaException>> commitSync() {
return new HashMap<>();

View File

@ -70,6 +70,11 @@ public interface ShareConsumer<K, V> extends Closeable {
*/
void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);
/**
* @see KafkaShareConsumer#acknowledge(String, int, long, AcknowledgeType)
*/
void acknowledge(String topic, int partition, long offset, AcknowledgeType type);
/**
* @see KafkaShareConsumer#commitSync()
*/

View File

@ -41,11 +41,13 @@ import org.slf4j.Logger;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.Set;
/**
* {@link ShareCompletedFetch} represents a {@link RecordBatch batch} of {@link Record records}
@ -162,15 +164,15 @@ public class ShareCompletedFetch {
if (cachedBatchException != null) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(cachedBatchException);
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(new ShareInFlightBatchException(cachedBatchException, offsets));
cachedBatchException = null;
return inFlightBatch;
}
if (cachedRecordException != null) {
inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE);
inFlightBatch.setException(cachedRecordException);
inFlightBatch.setException(new ShareInFlightBatchException(cachedRecordException, Set.of(lastRecord.offset())));
cachedRecordException = null;
return inFlightBatch;
}
@ -224,7 +226,7 @@ public class ShareCompletedFetch {
nextAcquired = nextAcquiredRecord();
if (inFlightBatch.isEmpty()) {
inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE);
inFlightBatch.setException(se);
inFlightBatch.setException(new ShareInFlightBatchException(se, Set.of(lastRecord.offset())));
} else {
cachedRecordException = se;
inFlightBatch.setHasCachedException(true);
@ -232,8 +234,8 @@ public class ShareCompletedFetch {
} catch (CorruptRecordException e) {
if (inFlightBatch.isEmpty()) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(e);
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(new ShareInFlightBatchException(e, offsets));
} else {
cachedBatchException = e;
inFlightBatch.setHasCachedException(true);
@ -261,12 +263,13 @@ public class ShareCompletedFetch {
return null;
}
private <K, V> void rejectRecordBatch(final ShareInFlightBatch<K, V> inFlightBatch,
private <K, V> Set<Long> rejectRecordBatch(final ShareInFlightBatch<K, V> inFlightBatch,
final RecordBatch currentBatch) {
// Rewind the acquiredRecordIterator to the start, so we are in a known state
acquiredRecordIterator = acquiredRecordList.listIterator();
OffsetAndDeliveryCount nextAcquired = nextAcquiredRecord();
Set<Long> offsets = new HashSet<>();
for (long offset = currentBatch.baseOffset(); offset <= currentBatch.lastOffset(); offset++) {
if (nextAcquired == null) {
// No more acquired records, so we are done
@ -274,6 +277,7 @@ public class ShareCompletedFetch {
} else if (offset == nextAcquired.offset) {
// It's acquired, so we reject it
inFlightBatch.addAcknowledgement(offset, AcknowledgeType.REJECT);
offsets.add(offset);
} else if (offset < nextAcquired.offset) {
// It's not acquired, so we skip it
continue;
@ -281,6 +285,7 @@ public class ShareCompletedFetch {
nextAcquired = nextAcquiredRecord();
}
return offsets;
}
/**

View File

@ -561,6 +561,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
Timer timer = time.timer(timeout);
@ -601,6 +602,9 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
} while (timer.notExpired());
return ConsumerRecords.empty();
} catch (ShareFetchException e) {
currentFetch = (ShareFetch<K, V>) e.shareFetch();
throw e.cause();
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
release();
@ -692,6 +696,19 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
}
}
/**
* {@inheritDoc}
*/
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
acquireAndEnsureOpen();
try {
ensureExplicitAcknowledgement();
currentFetch.acknowledge(topic, partition, offset, type);
} finally {
release();
}
}
/**
* {@inheritDoc}
*/

View File

@ -110,9 +110,9 @@ public class ShareFetch<K, V> {
* Acknowledge a single record in the current batch.
*
* @param record The record to acknowledge
* @param type The acknowledge type which indicates whether it was processed successfully
* @param type The acknowledgment type which indicates whether it was processed successfully
*/
public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType type) {
public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType type) {
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
TopicIdPartition tip = tipBatch.getKey();
if (tip.topic().equals(record.topic()) && (tip.partition() == record.partition())) {
@ -123,6 +123,29 @@ public class ShareFetch<K, V> {
throw new IllegalStateException("The record cannot be acknowledged.");
}
/**
* Acknowledge a single record by its topic, partition and offset in the current batch.
*
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record
* @param offset The offset of the record
* @param type The acknowledgment type which indicates whether it was processed successfully
*/
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
TopicIdPartition tip = tipBatch.getKey();
ShareInFlightBatchException exception = tipBatch.getValue().getException();
if (tip.topic().equals(topic) && (tip.partition() == partition) &&
exception != null &&
exception.offsets().contains(offset)) {
tipBatch.getValue().addAcknowledgement(offset, type);
return;
}
}
throw new IllegalStateException("The record cannot be acknowledged.");
}
/**
* Acknowledge all records in the current batch. If any records in the batch already have
* been acknowledged, those acknowledgements are not overwritten.

View File

@ -112,7 +112,7 @@ public class ShareFetchCollector<K, V> {
fetch.add(tp, batch);
if (batch.getException() != null) {
throw batch.getException();
throw new ShareFetchException(fetch, batch.getException().cause());
} else if (batch.hasCachedException()) {
break;
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.SerializationException;
public class ShareFetchException extends SerializationException {
private final ShareFetch<?, ?> shareFetch;
private final KafkaException cause;
public ShareFetchException(ShareFetch<?, ?> shareFetch, KafkaException cause) {
this.shareFetch = shareFetch;
this.cause = cause;
}
public ShareFetch<?, ?> shareFetch() {
return shareFetch;
}
public KafkaException cause() {
return cause;
}
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import java.util.ArrayList;
@ -34,7 +33,7 @@ public class ShareInFlightBatch<K, V> {
private final Map<Long, ConsumerRecord<K, V>> inFlightRecords;
private final Set<Long> acknowledgedRecords;
private Acknowledgements acknowledgements;
private KafkaException exception;
private ShareInFlightBatchException exception;
private boolean hasCachedException = false;
public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
@ -102,6 +101,7 @@ public class ShareInFlightBatch<K, V> {
acknowledgedRecords.forEach(inFlightRecords::remove);
}
acknowledgedRecords.clear();
exception = null;
Acknowledgements currentAcknowledgements = acknowledgements;
acknowledgements = Acknowledgements.empty();
@ -116,11 +116,11 @@ public class ShareInFlightBatch<K, V> {
return inFlightRecords.isEmpty() && acknowledgements.isEmpty();
}
public void setException(KafkaException exception) {
public void setException(ShareInFlightBatchException exception) {
this.exception = exception;
}
public KafkaException getException() {
public ShareInFlightBatchException getException() {
return exception;
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Set;
public class ShareInFlightBatchException extends SerializationException {
private final KafkaException cause;
private final Set<Long> offsets;
public ShareInFlightBatchException(KafkaException cause, Set<Long> offsets) {
this.cause = cause;
this.offsets = offsets;
}
public KafkaException cause() {
return cause;
}
public Set<Long> offsets() {
return offsets;
}
}

View File

@ -246,8 +246,8 @@ public class ShareCompletedFetchTest {
// Record 1 then results in an empty batch
batch = completedFetch.fetchRecords(deserializers, 10, false);
assertEquals(RecordDeserializationException.class, batch.getException().getClass());
RecordDeserializationException thrown = (RecordDeserializationException) batch.getException();
assertEquals(RecordDeserializationException.class, batch.getException().cause().getClass());
RecordDeserializationException thrown = (RecordDeserializationException) batch.getException().cause();
assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.KEY, thrown.origin());
assertEquals(1, thrown.offset());
assertEquals(TOPIC_NAME, thrown.topicPartition().topic());
@ -264,8 +264,8 @@ public class ShareCompletedFetchTest {
// Record 2 then results in an empty batch, because record 1 has now been skipped
batch = completedFetch.fetchRecords(deserializers, 10, false);
assertEquals(RecordDeserializationException.class, batch.getException().getClass());
thrown = (RecordDeserializationException) batch.getException();
assertEquals(RecordDeserializationException.class, batch.getException().cause().getClass());
thrown = (RecordDeserializationException) batch.getException().cause();
assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, thrown.origin());
assertEquals(2L, thrown.offset());
assertEquals(TOPIC_NAME, thrown.topicPartition().topic());

View File

@ -3400,7 +3400,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val interestedTopicPartitions = new util.ArrayList[TopicIdPartition]
erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case topicIdPartition =>
erroneousAndValidPartitionData.validTopicIdPartitions.forEach { topicIdPartition =>
if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))

View File

@ -46,6 +46,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
@ -177,12 +178,20 @@ public interface ClusterInstance {
}
default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs) {
return shareConsumer(configs, null, null);
}
default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
Map<String, Object> props = new HashMap<>(configs);
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
if (keyDeserializer == null) {
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (valueDeserializer == null) {
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaShareConsumer<>(setClientSaslConfig(props));
return new KafkaShareConsumer<>(setClientSaslConfig(props), keyDeserializer, valueDeserializer);
}
default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {