KAFKA-5374; Set allow auto topic creation to false when requesting node information only

It avoids the need to handle protocol downgrades and it's safe (i.e. it will never cause
the auto creation of topics).

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3220 from ijuma/kafka-5374-admin-client-metadata
This commit is contained in:
Colin P. Mccabe 2017-06-03 06:26:06 +01:00 committed by Ismael Juma
parent 6a5a908b1c
commit f389b71570
4 changed files with 44 additions and 2 deletions

View File

@ -162,6 +162,7 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="com.fasterxml.jackson" />

View File

@ -281,8 +281,10 @@ public class KafkaAdminClient extends AdminClient {
ApiVersions apiVersions = new ApiVersions();
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
@ -1222,7 +1224,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(Collections.<String>emptyList(), false);
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
}
@Override

View File

@ -77,11 +77,13 @@ class ClientCompatibilityFeaturesTest(Test):
"--offsets-for-times-supported %s "
"--cluster-id-supported %s "
"--expect-record-too-large-exception %s "
"--num-cluster-nodes %d "
"--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
self.kafka.bootstrap_servers(),
features["offsets-for-times-supported"],
features["cluster-id-supported"],
features["expect-record-too-large-exception"],
len(self.kafka.nodes),
self.topics.keys()[0]))
results_dir = TestContext.results_dir(self.test_context, 0)
os.makedirs(results_dir)

View File

@ -20,6 +20,8 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -31,6 +33,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
@ -44,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -69,6 +74,7 @@ public class ClientCompatibilityTest {
final boolean offsetsForTimesSupported;
final boolean expectClusterId;
final boolean expectRecordTooLargeException;
final int numClusterNodes;
TestConfig(Namespace res) {
this.bootstrapServer = res.getString("bootstrapServer");
@ -76,6 +82,7 @@ public class ClientCompatibilityTest {
this.offsetsForTimesSupported = res.getBoolean("offsetsForTimesSupported");
this.expectClusterId = res.getBoolean("clusterIdSupported");
this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException");
this.numClusterNodes = res.getInt("numClusterNodes");
}
}
@ -121,6 +128,14 @@ public class ClientCompatibilityTest {
.help("True if we should expect a RecordTooLargeException when trying to read from a topic " +
"that contains a message that is bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG +
". This is pre-KIP-74 behavior.");
parser.addArgument("--num-cluster-nodes")
.action(store())
.required(true)
.type(Integer.class)
.dest("numClusterNodes")
.metavar("NUM_CLUSTER_NODES")
.help("The number of cluster nodes we should expect to see from the AdminClient.");
Namespace res = null;
try {
res = parser.parseArgs(args);
@ -183,6 +198,7 @@ public class ClientCompatibilityTest {
void run() throws Exception {
long prodTimeMs = Time.SYSTEM.milliseconds();
testAdminClient();
testProduce();
testConsume(prodTimeMs);
}
@ -202,6 +218,25 @@ public class ClientCompatibilityTest {
producer.close();
}
void testAdminClient() throws Exception {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
try (AdminClient client = AdminClient.create(adminProps)) {
while (true) {
Collection<Node> nodes = client.describeCluster().nodes().get();
if (nodes.size() == testConfig.numClusterNodes) {
break;
} else if (nodes.size() > testConfig.numClusterNodes) {
throw new KafkaException("Expected to see " + testConfig.numClusterNodes +
" nodes, but saw " + nodes.size());
}
Thread.sleep(1);
log.info("Saw only {} cluster nodes. Waiting to see {}.",
nodes.size(), testConfig.numClusterNodes);
}
}
}
private static class OffsetsForTime {
Map<TopicPartition, OffsetAndTimestamp> result;