KAFKA-3562; Handle topic deletion during a send

Fix timing window in producer by holding onto cluster object while processing send requests so that changes to cluster during metadata refresh don't cause NPE if a topic is deleted.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1478 from rajinisivaram/KAFKA-3562
This commit is contained in:
Rajini Sivaram 2016-07-11 17:06:59 +01:00 committed by Ismael Juma
parent 383cec9cf3
commit c439268224
3 changed files with 93 additions and 13 deletions

View File

@ -576,6 +576,9 @@ project(':clients') {
testCompile libs.bcpkix testCompile libs.bcpkix
testCompile libs.junit testCompile libs.junit
testCompile libs.easymock
testCompile libs.powermock
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j testRuntime libs.slf4jlog4j
} }

View File

@ -437,8 +437,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
TopicPartition tp = null; TopicPartition tp = null;
try { try {
// first make sure the metadata for the topic is available // first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey; byte[] serializedKey;
try { try {
serializedKey = keySerializer.serialize(record.topic(), record.key()); serializedKey = keySerializer.serialize(record.topic(), record.key());
@ -455,7 +456,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer"); " specified in value.serializer");
} }
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize); ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition); tp = new TopicPartition(record.topic(), partition);
@ -508,17 +510,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Wait for cluster metadata including partitions for the given topic to be available. * Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for * @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata * @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The amount of time we waited in ms * @return The cluster containing topic metadata and the amount of time we waited in ms
*/ */
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { private ClusterAndWaitTime waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry // add topic to metadata topic list if it is not there already and reset expiry
this.metadata.add(topic); this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null) Cluster cluster = metadata.fetch();
return 0; if (cluster.partitionsForTopic(topic) != null)
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds(); long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs; long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) { long elapsed = 0;
while (cluster.partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic); log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate(); int version = metadata.requestUpdate();
sender.wakeup(); sender.wakeup();
@ -528,14 +532,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
} }
long elapsed = time.milliseconds() - begin; cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs) if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic)) if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic); throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed; remainingWaitMs = maxWaitMs - elapsed;
} }
return time.milliseconds() - begin; return new ClusterAndWaitTime(cluster, elapsed);
} }
/** /**
@ -600,12 +605,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/ */
@Override @Override
public List<PartitionInfo> partitionsFor(String topic) { public List<PartitionInfo> partitionsFor(String topic) {
Cluster cluster;
try { try {
waitOnMetadata(topic, this.maxBlockTimeMs); cluster = waitOnMetadata(topic, this.maxBlockTimeMs).cluster;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptException(e); throw new InterruptException(e);
} }
return this.metadata.fetch().partitionsForTopic(topic); return cluster.partitionsForTopic(topic);
} }
/** /**
@ -724,6 +730,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
cluster); cluster);
} }
private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) {
this.cluster = cluster;
this.waitedOnMetadataMs = waitedOnMetadataMs;
}
}
private static class FutureFailure implements Future<RecordMetadata> { private static class FutureFailure implements Future<RecordMetadata> {
private final ExecutionException exception; private final ExecutionException exception;

View File

@ -16,21 +16,37 @@
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProducerInterceptor; import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer; import org.apache.kafka.test.MockSerializer;
import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.support.membermodification.MemberModifier;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Properties; import java.util.Properties;
import java.util.Map; import java.util.Map;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
public class KafkaProducerTest { public class KafkaProducerTest {
@Test @Test
@ -123,4 +139,50 @@ public class KafkaProducerTest {
config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2); config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2);
new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
} }
@PrepareOnlyThisForTest(Metadata.class)
@Test
public void testMetadataFetch() throws Exception {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Metadata metadata = PowerMock.createNiceMock(Metadata.class);
MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
String topic = "topic";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
final Cluster emptyCluster = new Cluster(nodes,
Collections.<PartitionInfo>emptySet(),
Collections.<String>emptySet());
final Cluster cluster = new Cluster(
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
Collections.<String>emptySet());
// Expect exactly one fetch for each attempt to refresh while topic metadata is not available
final int refreshAttempts = 5;
EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1);
EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
PowerMock.replay(metadata);
producer.send(record);
PowerMock.verify(metadata);
// Expect exactly one fetch if topic metadata is available
PowerMock.reset(metadata);
EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
PowerMock.replay(metadata);
producer.send(record, null);
PowerMock.verify(metadata);
// Expect exactly one fetch if topic metadata is available
PowerMock.reset(metadata);
EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
PowerMock.replay(metadata);
producer.partitionsFor(topic);
PowerMock.verify(metadata);
}
} }