diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 62edeadd1d7..957d72cea01 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -41,6 +41,8 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; @@ -67,6 +69,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -843,6 +846,144 @@ public class ShareConsumerTest { } } + @ClusterTest + public void testExplicitOverrideAcknowledgeCorruptedMessage() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT), + null, + mockErrorDeserializer(3))) { + + ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(2, records.count()); + Iterator> iterator = records.iterator(); + + ConsumerRecord firstRecord = iterator.next(); + ConsumerRecord secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + shareConsumer.acknowledge(firstRecord); + shareConsumer.acknowledge(secondRecord); + + RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(2, rde.offset()); + shareConsumer.commitSync(); + + // The corrupted record was automatically released, so we can still obtain it. + rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(2, rde.offset()); + + // Reject this record + shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); + shareConsumer.commitSync(); + + records = shareConsumer.poll(Duration.ZERO); + assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testExplicitAcknowledgeOffsetThrowsNotException() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(1, records.count()); + ConsumerRecord consumedRecord = records.records(tp).get(0); + assertEquals(0L, consumedRecord.offset()); + + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT)); + + shareConsumer.acknowledge(consumedRecord); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testExplicitAcknowledgeOffsetThrowsParametersError() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT), + null, + mockErrorDeserializer(2))) { + + ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(1, records.count()); + Iterator> iterator = records.iterator(); + + ConsumerRecord firstRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + shareConsumer.acknowledge(firstRecord); + + final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(1, rde.offset()); + + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT)); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT)); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT)); + + // Reject this record. + shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); + shareConsumer.commitSync(); + + // The next acknowledge() should throw an IllegalStateException as the record has been acked. + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT)); + + records = shareConsumer.poll(Duration.ZERO); + assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + private ByteArrayDeserializer mockErrorDeserializer(int recordNumber) { + int recordIndex = recordNumber - 1; + return new ByteArrayDeserializer() { + int i = 0; + + @Override + public byte[] deserialize(String topic, Headers headers, ByteBuffer data) { + if (i == recordIndex) { + throw new SerializationException(); + } else { + i++; + return super.deserialize(topic, headers, data); + } + } + }; + } + @ClusterTest public void testImplicitAcknowledgeFailsExplicit() { alterShareAutoOffsetReset("group1", "earliest"); @@ -2794,13 +2935,22 @@ public class ShareConsumerTest { private ShareConsumer createShareConsumer( String groupId, Map additionalProperties + ) { + return createShareConsumer(groupId, additionalProperties, null, null); + } + + private ShareConsumer createShareConsumer( + String groupId, + Map additionalProperties, + Deserializer keyDeserializer, + Deserializer valueDeserializer ) { Properties props = new Properties(); props.putAll(additionalProperties); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); Map conf = new HashMap<>(); props.forEach((k, v) -> conf.put((String) k, v)); - return cluster.shareConsumer(conf); + return cluster.shareConsumer(conf, keyDeserializer, valueDeserializer); } private void warmup() throws InterruptedException { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index 923c85a3980..76eb373c68b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -507,6 +507,28 @@ public class KafkaShareConsumer implements ShareConsumer { delegate.acknowledge(record, type); } + /** + * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether + * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()}, + * {@link #commitAsync()} or {@link #poll(Duration)} call. + *

This method can only be used if the consumer is using explicit acknowledgement. + *

It provides an alternative to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for + * situations where the {@link ConsumerRecord} is not available, such as when the record could not be deserialized. + * + * @param topic The topic of the record to acknowledge + * @param partition The partition of the record to acknowledge + * @param offset The offset of the record to acknowledge + * @param type The acknowledge type which indicates whether it was processed successfully + * + * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using + * explicit acknowledgement + */ + + @Override + public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) { + delegate.acknowledge(topic, partition, offset, type); + } + /** * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java index 81cb2eeec00..f1dad522d5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java @@ -104,6 +104,10 @@ public class MockShareConsumer implements ShareConsumer { public synchronized void acknowledge(ConsumerRecord record, AcknowledgeType type) { } + @Override + public synchronized void acknowledge(String topic, int partition, long offset, AcknowledgeType type) { + } + @Override public synchronized Map> commitSync() { return new HashMap<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java index 900c249d852..58f5fc4d38e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java @@ -70,6 +70,11 @@ public interface ShareConsumer extends Closeable { */ void acknowledge(ConsumerRecord record, AcknowledgeType type); + /** + * @see KafkaShareConsumer#acknowledge(String, int, long, AcknowledgeType) + */ + void acknowledge(String topic, int partition, long offset, AcknowledgeType type); + /** * @see KafkaShareConsumer#commitSync() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index 83bae92d48a..f2664050bc8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -41,11 +41,13 @@ import org.slf4j.Logger; import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Optional; +import java.util.Set; /** * {@link ShareCompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} @@ -162,15 +164,15 @@ public class ShareCompletedFetch { if (cachedBatchException != null) { // If the event that a CRC check fails, reject the entire record batch because it is corrupt. - rejectRecordBatch(inFlightBatch, currentBatch); - inFlightBatch.setException(cachedBatchException); + Set offsets = rejectRecordBatch(inFlightBatch, currentBatch); + inFlightBatch.setException(new ShareInFlightBatchException(cachedBatchException, offsets)); cachedBatchException = null; return inFlightBatch; } if (cachedRecordException != null) { inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE); - inFlightBatch.setException(cachedRecordException); + inFlightBatch.setException(new ShareInFlightBatchException(cachedRecordException, Set.of(lastRecord.offset()))); cachedRecordException = null; return inFlightBatch; } @@ -224,7 +226,7 @@ public class ShareCompletedFetch { nextAcquired = nextAcquiredRecord(); if (inFlightBatch.isEmpty()) { inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE); - inFlightBatch.setException(se); + inFlightBatch.setException(new ShareInFlightBatchException(se, Set.of(lastRecord.offset()))); } else { cachedRecordException = se; inFlightBatch.setHasCachedException(true); @@ -232,8 +234,8 @@ public class ShareCompletedFetch { } catch (CorruptRecordException e) { if (inFlightBatch.isEmpty()) { // If the event that a CRC check fails, reject the entire record batch because it is corrupt. - rejectRecordBatch(inFlightBatch, currentBatch); - inFlightBatch.setException(e); + Set offsets = rejectRecordBatch(inFlightBatch, currentBatch); + inFlightBatch.setException(new ShareInFlightBatchException(e, offsets)); } else { cachedBatchException = e; inFlightBatch.setHasCachedException(true); @@ -261,12 +263,13 @@ public class ShareCompletedFetch { return null; } - private void rejectRecordBatch(final ShareInFlightBatch inFlightBatch, + private Set rejectRecordBatch(final ShareInFlightBatch inFlightBatch, final RecordBatch currentBatch) { // Rewind the acquiredRecordIterator to the start, so we are in a known state acquiredRecordIterator = acquiredRecordList.listIterator(); OffsetAndDeliveryCount nextAcquired = nextAcquiredRecord(); + Set offsets = new HashSet<>(); for (long offset = currentBatch.baseOffset(); offset <= currentBatch.lastOffset(); offset++) { if (nextAcquired == null) { // No more acquired records, so we are done @@ -274,6 +277,7 @@ public class ShareCompletedFetch { } else if (offset == nextAcquired.offset) { // It's acquired, so we reject it inFlightBatch.addAcknowledgement(offset, AcknowledgeType.REJECT); + offsets.add(offset); } else if (offset < nextAcquired.offset) { // It's not acquired, so we skip it continue; @@ -281,6 +285,7 @@ public class ShareCompletedFetch { nextAcquired = nextAcquiredRecord(); } + return offsets; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 081fecf78db..33309ffb63d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -561,6 +561,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { * {@inheritDoc} */ @Override + @SuppressWarnings("unchecked") public synchronized ConsumerRecords poll(final Duration timeout) { Timer timer = time.timer(timeout); @@ -601,6 +602,9 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { } while (timer.notExpired()); return ConsumerRecords.empty(); + } catch (ShareFetchException e) { + currentFetch = (ShareFetch) e.shareFetch(); + throw e.cause(); } finally { kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs()); release(); @@ -692,6 +696,19 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { } } + /** + * {@inheritDoc} + */ + public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) { + acquireAndEnsureOpen(); + try { + ensureExplicitAcknowledgement(); + currentFetch.acknowledge(topic, partition, offset, type); + } finally { + release(); + } + } + /** * {@inheritDoc} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java index eb79fa79c40..d587e29f382 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java @@ -110,9 +110,9 @@ public class ShareFetch { * Acknowledge a single record in the current batch. * * @param record The record to acknowledge - * @param type The acknowledge type which indicates whether it was processed successfully + * @param type The acknowledgment type which indicates whether it was processed successfully */ - public void acknowledge(final ConsumerRecord record, AcknowledgeType type) { + public void acknowledge(final ConsumerRecord record, final AcknowledgeType type) { for (Map.Entry> tipBatch : batches.entrySet()) { TopicIdPartition tip = tipBatch.getKey(); if (tip.topic().equals(record.topic()) && (tip.partition() == record.partition())) { @@ -123,6 +123,29 @@ public class ShareFetch { throw new IllegalStateException("The record cannot be acknowledged."); } + /** + * Acknowledge a single record by its topic, partition and offset in the current batch. + * + * @param topic The topic of the record to acknowledge + * @param partition The partition of the record + * @param offset The offset of the record + * @param type The acknowledgment type which indicates whether it was processed successfully + */ + public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) { + for (Map.Entry> tipBatch : batches.entrySet()) { + TopicIdPartition tip = tipBatch.getKey(); + ShareInFlightBatchException exception = tipBatch.getValue().getException(); + if (tip.topic().equals(topic) && (tip.partition() == partition) && + exception != null && + exception.offsets().contains(offset)) { + + tipBatch.getValue().addAcknowledgement(offset, type); + return; + } + } + throw new IllegalStateException("The record cannot be acknowledged."); + } + /** * Acknowledge all records in the current batch. If any records in the batch already have * been acknowledged, those acknowledgements are not overwritten. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java index 3d073fa92eb..c2a17d051b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java @@ -112,7 +112,7 @@ public class ShareFetchCollector { fetch.add(tp, batch); if (batch.getException() != null) { - throw batch.getException(); + throw new ShareFetchException(fetch, batch.getException().cause()); } else if (batch.hasCachedException()) { break; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java new file mode 100644 index 00000000000..5e904e25068 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.SerializationException; + +public class ShareFetchException extends SerializationException { + + private final ShareFetch shareFetch; + + private final KafkaException cause; + + public ShareFetchException(ShareFetch shareFetch, KafkaException cause) { + this.shareFetch = shareFetch; + this.cause = cause; + } + + public ShareFetch shareFetch() { + return shareFetch; + } + + public KafkaException cause() { + return cause; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java index b2d6fad17fd..0fa0499aa1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import java.util.ArrayList; @@ -34,7 +33,7 @@ public class ShareInFlightBatch { private final Map> inFlightRecords; private final Set acknowledgedRecords; private Acknowledgements acknowledgements; - private KafkaException exception; + private ShareInFlightBatchException exception; private boolean hasCachedException = false; public ShareInFlightBatch(int nodeId, TopicIdPartition partition) { @@ -102,6 +101,7 @@ public class ShareInFlightBatch { acknowledgedRecords.forEach(inFlightRecords::remove); } acknowledgedRecords.clear(); + exception = null; Acknowledgements currentAcknowledgements = acknowledgements; acknowledgements = Acknowledgements.empty(); @@ -116,11 +116,11 @@ public class ShareInFlightBatch { return inFlightRecords.isEmpty() && acknowledgements.isEmpty(); } - public void setException(KafkaException exception) { + public void setException(ShareInFlightBatchException exception) { this.exception = exception; } - public KafkaException getException() { + public ShareInFlightBatchException getException() { return exception; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatchException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatchException.java new file mode 100644 index 00000000000..bd8fa2602e0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatchException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Set; + +public class ShareInFlightBatchException extends SerializationException { + + private final KafkaException cause; + + private final Set offsets; + + public ShareInFlightBatchException(KafkaException cause, Set offsets) { + this.cause = cause; + this.offsets = offsets; + } + + public KafkaException cause() { + return cause; + } + + public Set offsets() { + return offsets; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java index 73efb010c8b..a1814fd935c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java @@ -246,8 +246,8 @@ public class ShareCompletedFetchTest { // Record 1 then results in an empty batch batch = completedFetch.fetchRecords(deserializers, 10, false); - assertEquals(RecordDeserializationException.class, batch.getException().getClass()); - RecordDeserializationException thrown = (RecordDeserializationException) batch.getException(); + assertEquals(RecordDeserializationException.class, batch.getException().cause().getClass()); + RecordDeserializationException thrown = (RecordDeserializationException) batch.getException().cause(); assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.KEY, thrown.origin()); assertEquals(1, thrown.offset()); assertEquals(TOPIC_NAME, thrown.topicPartition().topic()); @@ -264,8 +264,8 @@ public class ShareCompletedFetchTest { // Record 2 then results in an empty batch, because record 1 has now been skipped batch = completedFetch.fetchRecords(deserializers, 10, false); - assertEquals(RecordDeserializationException.class, batch.getException().getClass()); - thrown = (RecordDeserializationException) batch.getException(); + assertEquals(RecordDeserializationException.class, batch.getException().cause().getClass()); + thrown = (RecordDeserializationException) batch.getException().cause(); assertEquals(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, thrown.origin()); assertEquals(2L, thrown.offset()); assertEquals(TOPIC_NAME, thrown.topicPartition().topic()); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f282c245f63..c6aca487404 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3400,7 +3400,7 @@ class KafkaApis(val requestChannel: RequestChannel, val interestedTopicPartitions = new util.ArrayList[TopicIdPartition] - erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case topicIdPartition => + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { topicIdPartition => if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic)) erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicIdPartition.topicPartition)) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 243ca5394d5..b7f9d4c289a 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -46,6 +46,7 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; @@ -177,12 +178,20 @@ public interface ClusterInstance { } default ShareConsumer shareConsumer(Map configs) { + return shareConsumer(configs, null, null); + } + + default ShareConsumer shareConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { Map props = new HashMap<>(configs); - props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + if (keyDeserializer == null) { + props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if (valueDeserializer == null) { + props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5)); props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - return new KafkaShareConsumer<>(setClientSaslConfig(props)); + return new KafkaShareConsumer<>(setClientSaslConfig(props), keyDeserializer, valueDeserializer); } default Admin admin(Map configs, boolean usingBootstrapControllers) {