KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)

The bug was introduced in #11689 that an additional onAcknowledgement was made using the InterceptorCallback class. This is undesirable since onSendError will attempt to call onAcknowledgement once more.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Philip Nee 2022-04-25 15:59:45 -07:00 committed by GitHub
parent 2a7fdd7670
commit b020819ac4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 7 deletions

View File

@ -1019,11 +1019,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
tp = ProducerInterceptors.extractTopicPartition(record);
}
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// The onCompletion callback does expect a non-null metadata, but one will be created inside
// the interceptor's onCompletion implementation before the user's callback is invoked.
interceptCallback.onCompletion(null, e);
if (callback != null) {
RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
callback.onCompletion(nullMetadata, e);
}
this.errors.record();
this.interceptors.onSendError(record, tp, e);
if (transactionManager != null) {

View File

@ -1901,10 +1901,13 @@ public class KafkaProducerTest {
}
@Test
public void testCallbackHandlesError() throws Exception {
public void testCallbackAndInterceptorHandleError() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "something");
Time time = new MockTime();
ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
@ -1912,8 +1915,11 @@ public class KafkaProducerTest {
String invalidTopicName = "topic abc"; // Invalid topic name due to space
ProducerInterceptors<String, String> producerInterceptors =
new ProducerInterceptors<>(Arrays.asList(new MockProducerInterceptor()));
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
producerMetadata, client, null, time)) {
producerMetadata, client, producerInterceptors, time)) {
ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
// Here's the important piece of the test. Let's make sure that the RecordMetadata we get
@ -1938,6 +1944,7 @@ public class KafkaProducerTest {
};
producer.send(record, callBack);
assertEquals(1, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue());
}
}

View File

@ -35,6 +35,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ACKNOWLEDGEMENT_COUNT = new AtomicInteger(0);
public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT = new AtomicReference<>(NO_CLUSTER_ID);
@ -69,6 +70,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
ON_ACKNOWLEDGEMENT_COUNT.incrementAndGet();
// This will ensure that we get the cluster metadata when onAcknowledgement is called for the first time
// as subsequent compareAndSet operations will fail.
CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get());