KAFKA-4631; Request metadata in consumer if topic/partitions unavailable

If leader node of one more more partitions in a consumer subscription are temporarily unavailable, request metadata refresh so that partitions skipped for assignment dont have to wait for metadata expiry before reassignment. Metadata refresh is also requested if a subscribe topic or assigned partition doesn't exist.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #2622 from rajinisivaram/KAFKA-4631
This commit is contained in:
Rajini Sivaram 2017-03-02 17:49:01 -08:00 committed by Jason Gustafson
parent 670980ef29
commit a3c45b0c92
19 changed files with 270 additions and 107 deletions

View File

@ -199,8 +199,13 @@ public final class Metadata {
/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
* @param now current time in milliseconds
*/
public synchronized void update(Cluster cluster, long now) {
public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
this.needUpdate = false;
@ -223,7 +228,7 @@ public final class Metadata {
}
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
listener.onMetadataUpdate(cluster, unavailableTopics);
String previousClusterId = cluster.clusterResource().clusterId();
@ -306,7 +311,14 @@ public final class Metadata {
* MetadataUpdate Listener
*/
public interface Listener {
void onMetadataUpdate(Cluster cluster);
/**
* Callback invoked on metadata update.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
*/
void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
}
private synchronized void requestUpdateForNewTopics() {

View File

@ -715,7 +715,7 @@ public class NetworkClient implements KafkaClient {
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
this.metadata.update(cluster, response.unavailableTopics(), now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);

View File

@ -656,7 +656,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient netClient = new NetworkClient(

View File

@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -172,7 +173,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
// if we encounter any unauthorized topics, raise an exception to the user
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
@ -186,6 +187,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!snapshot.equals(metadataSnapshot))
metadataSnapshot = snapshot;
}
if (!Collections.disjoint(metadata.topics(), unavailableTopics))
metadata.requestUpdate();
}
});
}

View File

@ -305,7 +305,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
time);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@ -203,6 +204,29 @@ public class MetadataResponse extends AbstractResponse {
return errorTopics;
}
/**
* Returns the set of topics with an error indicating invalid metadata
* and topics with any partition whose error indicates invalid metadata.
* This includes all non-existent topics specified in the metadata request
* and any topic returned with one or more partitions whose leader is not known.
*/
public Set<String> unavailableTopics() {
Set<String> invalidMetadataTopics = new HashSet<>();
for (TopicMetadata topicMetadata : this.topicMetadata) {
if (topicMetadata.error.exception() instanceof InvalidMetadataException)
invalidMetadataTopics.add(topicMetadata.topic);
else {
for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) {
if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
invalidMetadataTopics.add(topicMetadata.topic);
break;
}
}
}
}
return invalidMetadataTopics;
}
/**
* Get a snapshot of the cluster metadata from this response
* @return the cluster snapshot

View File

@ -56,7 +56,7 @@ public class MetadataTest {
@Test
public void testMetadata() throws Exception {
long time = 0;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
metadata.requestUpdate();
assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@ -71,7 +71,7 @@ public class MetadataTest {
// This simulates the metadata update sequence in KafkaProducer
while (t1.isAlive() || t2.isAlive()) {
if (metadata.timeToNextUpdate(time) == 0) {
metadata.update(TestUtils.singletonCluster(topic, 1), time);
metadata.update(TestUtils.singletonCluster(topic, 1), Collections.<String>emptySet(), time);
time += refreshBackoffMs;
}
Thread.sleep(1);
@ -101,7 +101,7 @@ public class MetadataTest {
assertEquals(0, metadata.timeToNextUpdate(now));
// lastSuccessfulRefreshMs updated to now.
metadata.update(Cluster.empty(), now);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
// The last update was successful so the remaining time to expire the current metadata should be returned.
assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
@ -112,7 +112,7 @@ public class MetadataTest {
assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
// Reset needUpdate to false.
metadata.update(Cluster.empty(), now);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
// Both metadataExpireMs and refreshBackoffMs elapsed.
@ -156,13 +156,13 @@ public class MetadataTest {
long now = 10000;
// New topic added to fetch set and update requested. It should allow immediate update.
metadata.update(Cluster.empty(), now);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
metadata.add("new-topic");
assertEquals(0, metadata.timeToNextUpdate(now));
// Even though setTopics called, immediate update isn't necessary if the new topic set isn't
// containing a new topic,
metadata.update(Cluster.empty(), now);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
metadata.setTopics(metadata.topics());
assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
@ -171,12 +171,12 @@ public class MetadataTest {
assertEquals(0, metadata.timeToNextUpdate(now));
// If metadata requested for all topics it should allow immediate update.
metadata.update(Cluster.empty(), now);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
metadata.needMetadataForAllTopics(true);
assertEquals(0, metadata.timeToNextUpdate(now));
// However if metadata is already capable to serve all topics it shouldn't override backoff.
metadata.update(Cluster.empty(), now);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
metadata.needMetadataForAllTopics(true);
assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
}
@ -191,7 +191,7 @@ public class MetadataTest {
@Test
public void testMetadataUpdateWaitTime() throws Exception {
long time = 0;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
// first try with a max wait time of 0 and ensure that this returns back without waiting forever
try {
@ -213,7 +213,7 @@ public class MetadataTest {
@Test
public void testFailedUpdate() {
long time = 100;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
metadata.failedUpdate(1100);
@ -222,14 +222,14 @@ public class MetadataTest {
assertEquals(100, metadata.lastSuccessfulUpdate());
metadata.needMetadataForAllTopics(true);
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
}
@Test
public void testUpdateWithNeedMetadataForAllTopics() {
long time = 0;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
metadata.needMetadataForAllTopics(true);
final List<String> expectedTopics = Collections.singletonList("topic");
@ -241,7 +241,7 @@ public class MetadataTest {
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
Collections.<String>emptySet(), 100);
assertArrayEquals("Metadata got updated with wrong set of topics.",
expectedTopics.toArray(), metadata.topics().toArray());
@ -259,7 +259,7 @@ public class MetadataTest {
String hostName = "www.example.com";
Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
metadata.update(cluster, time);
metadata.update(cluster, Collections.<String>emptySet(), time);
assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
@ -271,7 +271,7 @@ public class MetadataTest {
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
Collections.<String>emptySet(), 100);
assertEquals("MockClusterResourceListener did not get cluster metadata correctly",
"dummy", mockClusterListener.clusterResource().clusterId());
@ -283,10 +283,10 @@ public class MetadataTest {
public void testListenerGetsNotifiedOfUpdate() {
long time = 0;
final Set<String> topics = new HashSet<>();
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
topics.clear();
topics.addAll(cluster.topics());
}
@ -300,7 +300,7 @@ public class MetadataTest {
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
Collections.<String>emptySet(), 100);
assertEquals("Listener did not update topics list correctly",
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@ -310,10 +310,10 @@ public class MetadataTest {
public void testListenerCanUnregister() {
long time = 0;
final Set<String> topics = new HashSet<>();
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
final Metadata.Listener listener = new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
topics.clear();
topics.addAll(cluster.topics());
}
@ -328,7 +328,7 @@ public class MetadataTest {
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
Collections.<String>emptySet(), 100);
metadata.removeListener(listener);
@ -340,7 +340,7 @@ public class MetadataTest {
new PartitionInfo("topic3", 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
Collections.<String>emptySet(), 100);
assertEquals("Listener did not update topics list correctly",
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@ -353,17 +353,17 @@ public class MetadataTest {
// Test that topic is expired if not used within the expiry interval
long time = 0;
metadata.add("topic1");
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
time += Metadata.TOPIC_EXPIRY_MS;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
// Test that topic is not expired if used within the expiry interval
metadata.add("topic2");
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
for (int i = 0; i < 3; i++) {
time += Metadata.TOPIC_EXPIRY_MS / 2;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
metadata.add("topic2");
}
@ -372,9 +372,9 @@ public class MetadataTest {
HashSet<String> topics = new HashSet<>();
topics.add("topic4");
metadata.setTopics(topics);
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
time += Metadata.TOPIC_EXPIRY_MS;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
}
@ -385,17 +385,17 @@ public class MetadataTest {
// Test that topic is not expired if not used within the expiry interval
long time = 0;
metadata.add("topic1");
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
time += Metadata.TOPIC_EXPIRY_MS;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
// Test that topic is not expired if used within the expiry interval
metadata.add("topic2");
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
for (int i = 0; i < 3; i++) {
time += Metadata.TOPIC_EXPIRY_MS / 2;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
metadata.add("topic2");
}
@ -405,7 +405,7 @@ public class MetadataTest {
topics.add("topic4");
metadata.setTopics(topics);
time += metadataExpireMs * 2;
metadata.update(Cluster.empty(), time);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.test.TestUtils;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -63,6 +64,7 @@ public class MockClient implements KafkaClient {
private final Time time;
private final Metadata metadata;
private Set<String> unavailableTopics;
private int correlation = 0;
private Node node = null;
private final Set<String> ready = new HashSet<>();
@ -72,17 +74,17 @@ public class MockClient implements KafkaClient {
// Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>();
private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
private final Queue<Cluster> metadataUpdates = new ArrayDeque<>();
private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
public MockClient(Time time) {
this.time = time;
this.metadata = null;
this(time, null);
}
public MockClient(Time time, Metadata metadata) {
this.time = time;
this.metadata = metadata;
this.unavailableTopics = Collections.emptySet();
}
@Override
@ -166,11 +168,13 @@ public class MockClient implements KafkaClient {
List<ClientResponse> copy = new ArrayList<>(this.responses);
if (metadata != null && metadata.updateRequested()) {
Cluster cluster = metadataUpdates.poll();
if (cluster == null)
metadata.update(metadata.fetch(), time.milliseconds());
else
metadata.update(cluster, time.milliseconds());
MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (metadataUpdate == null)
metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
else {
this.unavailableTopics = metadataUpdate.unavailableTopics;
metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
}
}
while (!this.responses.isEmpty()) {
@ -278,8 +282,8 @@ public class MockClient implements KafkaClient {
metadataUpdates.clear();
}
public void prepareMetadataUpdate(Cluster cluster) {
metadataUpdates.add(cluster);
public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
}
public void setNode(Node node) {
@ -340,4 +344,13 @@ public class MockClient implements KafkaClient {
public void setNodeApiVersions(NodeApiVersions nodeApiVersions) {
this.nodeApiVersions = nodeApiVersions;
}
private static class MetadataUpdate {
final Cluster cluster;
final Set<String> unavailableTopics;
MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
this.cluster = cluster;
this.unavailableTopics = unavailableTopics;
}
}
}

View File

@ -76,7 +76,7 @@ public class NetworkClientTest {
@Before
public void setup() {
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
}
@Test(expected = IllegalStateException.class)

View File

@ -364,7 +364,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -405,7 +405,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -446,7 +446,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -482,7 +482,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -530,7 +530,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -591,7 +591,7 @@ public class KafkaConsumerTest {
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
consumer.poll(0);
assertEquals(singleton(topic), consumer.subscription());
@ -622,7 +622,7 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
client.setNode(node);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
@ -630,14 +630,14 @@ public class KafkaConsumerTest {
Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
consumer.poll(0);
assertEquals(singleton(topic), consumer.subscription());
consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
consumer.poll(0);
@ -660,7 +660,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -705,7 +705,7 @@ public class KafkaConsumerTest {
final Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
final MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -745,7 +745,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -793,7 +793,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -913,7 +913,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -981,7 +981,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -1046,7 +1046,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -1107,7 +1107,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -1226,7 +1226,7 @@ public class KafkaConsumerTest {
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
@ -1238,7 +1238,7 @@ public class KafkaConsumerTest {
consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
// Poll with responses
client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);

View File

@ -79,7 +79,7 @@ public class AbstractCoordinatorTest {
Metrics metrics = new Metrics();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
metadata.update(cluster, mockTime.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), mockTime.milliseconds());
this.node = cluster.nodes().get(0);
mockClient.setNode(node);

View File

@ -120,7 +120,7 @@ public class ConsumerCoordinatorTest {
this.time = new MockTime();
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
this.client = new MockClient(time, metadata);
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
@ -292,7 +292,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
@ -310,7 +310,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
@ -349,7 +349,7 @@ public class ConsumerCoordinatorTest {
// partially update the metadata with one topic first,
// let the leader to refresh metadata during assignment
metadata.setTopics(singletonList(topic1));
metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
@ -369,7 +369,7 @@ public class ConsumerCoordinatorTest {
}
}, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
coordinator.poll(time.milliseconds());
@ -389,7 +389,7 @@ public class ConsumerCoordinatorTest {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.needMetadataForAllTopics(true);
metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
assertEquals(singleton(topic1), subscriptions.subscription());
@ -410,7 +410,7 @@ public class ConsumerCoordinatorTest {
final Map<String, Integer> updatedPartitions = new HashMap<>();
for (String topic : updatedSubscription)
updatedPartitions.put(topic, 1);
metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds());
metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.<String>emptySet(), time.milliseconds());
return true;
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
@ -453,7 +453,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
@ -524,7 +524,7 @@ public class ConsumerCoordinatorTest {
// partially update the metadata with one topic first,
// let the leader to refresh metadata during assignment
metadata.setTopics(singletonList(topic1));
metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
@ -541,7 +541,7 @@ public class ConsumerCoordinatorTest {
}
}, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
coordinator.joinGroupIfNeeded();
@ -712,7 +712,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@ -731,7 +731,7 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.needRejoin());
// a new partition is added to the topic
metadata.update(TestUtils.singletonCluster(topic1, 2), time.milliseconds());
metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds());
// we should detect the change and ask for reassignment
assertTrue(coordinator.needRejoin());
@ -751,7 +751,7 @@ public class ConsumerCoordinatorTest {
metadata.setTopics(topics);
// we only have metadata for one topic initially
metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
@ -772,7 +772,7 @@ public class ConsumerCoordinatorTest {
Map<String, Integer> topicPartitionCounts = new HashMap<>();
topicPartitionCounts.put(topic1, 1);
topicPartitionCounts.put(topic2, 1);
metadata.update(TestUtils.singletonCluster(topicPartitionCounts), time.milliseconds());
metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.<String>emptySet(), time.milliseconds());
return true;
}
return false;
@ -789,12 +789,72 @@ public class ConsumerCoordinatorTest {
assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
}
@Test
public void testRebalanceAfterTopicUnavailableWithSubscribe() {
unavailableTopicTest(false, false, Collections.<String>emptySet());
}
@Test
public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
unavailableTopicTest(true, false, Collections.<String>emptySet());
}
@Test
public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
unavailableTopicTest(true, false, Collections.<String>singleton("notmatching"));
}
@Test
public void testAssignWithTopicUnavailable() {
unavailableTopicTest(true, false, Collections.<String>emptySet());
}
private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) {
final String consumerId = "consumer";
metadata.setTopics(singletonList(topic1));
client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
if (assign)
subscriptions.assignFromUser(singleton(t1p));
else if (patternSubscribe)
subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
else
subscriptions.subscribe(singleton(topic1), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
coordinator.poll(time.milliseconds());
if (!assign) {
assertFalse(coordinator.needRejoin());
assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
}
assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
client.prepareMetadataUpdate(cluster, unavailableTopicsInLastMetadata);
client.poll(0, time.milliseconds());
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.milliseconds());
assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
if (!assign) {
assertFalse(coordinator.needRejoin());
assertEquals(singleton(t1p), rebalanceListener.assigned);
}
}
@Test
public void testExcludeInternalTopicsConfigOption() {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
}
@ -804,7 +864,7 @@ public class ConsumerCoordinatorTest {
coordinator = buildCoordinator(new Metrics(), assignors, false, false);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
}
@ -1030,7 +1090,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
client.prepareMetadataUpdate(cluster);
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
coordinator.joinGroupIfNeeded();

View File

@ -109,7 +109,7 @@ public class FetcherTest {
@Before
public void setup() throws Exception {
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
client.setNode(node);
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
@ -838,7 +838,7 @@ public class FetcherTest {
TopicPartition tp1 = new TopicPartition(topicName, 1);
// Ensure metadata has both partition.
Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
// First try should fail due to metadata error.
client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));

View File

@ -88,7 +88,7 @@ public class SenderTest {
time,
REQUEST_TIMEOUT);
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
}
@After
@ -201,7 +201,7 @@ public class SenderTest {
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
metadata.update(cluster1, time.milliseconds());
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
// Send the first message.
TopicPartition tp2 = new TopicPartition("test", 1);
@ -220,7 +220,7 @@ public class SenderTest {
// Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
Cluster cluster2 = TestUtils.singletonCluster("test", 2);
metadata.update(cluster2, time.milliseconds());
metadata.update(cluster2, Collections.<String>emptySet(), time.milliseconds());
// Sender should not send the second message to node 0.
sender.run(time.milliseconds());
assertEquals(1, client.inFlightRequestCount());
@ -236,12 +236,12 @@ public class SenderTest {
@Test
public void testMetadataTopicExpiry() throws Exception {
long offset = 0;
metadata.update(Cluster.empty(), time.milliseconds());
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
sender.run(time.milliseconds()); // send produce request
client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
sender.run(time.milliseconds());
@ -251,12 +251,12 @@ public class SenderTest {
assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic()));
time.sleep(Metadata.TOPIC_EXPIRY_MS);
metadata.update(Cluster.empty(), time.milliseconds());
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic()));
future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
metadata.update(cluster, time.milliseconds());
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
sender.run(time.milliseconds()); // send produce request
client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
sender.run(time.milliseconds());

View File

@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -87,7 +88,7 @@ public class WorkerGroupMember {
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient netClient = new NetworkClient(

View File

@ -91,7 +91,7 @@ public class WorkerCoordinatorTest {
this.time = new MockTime();
this.client = new MockClient(time);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();

View File

@ -243,7 +243,7 @@ object AdminClient {
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
metadata.update(bootstrapCluster, 0)
metadata.update(bootstrapCluster, Collections.emptySet(), 0)
val selector = new Selector(
DefaultConnectionMaxIdleMs,

View File

@ -43,13 +43,14 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
// Time to process commit and leave group requests in tests when brokers are available
val gracefulCloseTimeMs = 1000
val executor = Executors.newFixedThreadPool(2)
val executor = Executors.newScheduledThreadPool(2)
// configure the servers and clients
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@ -161,6 +162,52 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
}
}
@Test
def testSubscribeWhenTopicUnavailable() {
val numRecords = 1000
val newtopic = "newtopic"
val consumer = this.consumers.head
consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable {
def run() = TestUtils.createTopic(zkUtils, newtopic, serverCount, serverCount, servers)
}, 2, TimeUnit.SECONDS)
consumer.poll(0)
def sendRecords(numRecords: Int, topic: String = this.topic) {
var remainingRecords = numRecords
val endTimeMs = System.currentTimeMillis + 20000
while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) {
val futures = (0 until remainingRecords).map { i =>
this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
}
futures.map { future =>
try {
future.get
remainingRecords -= 1
} catch {
case _: Exception =>
}
}
}
assertEquals(0, remainingRecords)
}
sendRecords(numRecords, newtopic)
receiveRecords(consumer, numRecords, newtopic, 10000)
servers.foreach(server => killBroker(server.config.brokerId))
Thread.sleep(500)
restartDeadBrokers()
val future = executor.submit(new Runnable {
def run() = receiveRecords(consumer, numRecords, newtopic, 10000)
})
sendRecords(numRecords, newtopic)
future.get
}
@Test
def testClose() {
val numRecords = 10
@ -312,10 +359,12 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
consumer
}
private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
var received = 0
while (received < numRecords)
private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, topic: String = this.topic, timeoutMs: Long = 60000) {
var received = 0L
val endTimeMs = System.currentTimeMillis + timeoutMs
while (received < numRecords && System.currentTimeMillis < endTimeMs)
received += consumer.poll(1000).count()
assertEquals(numRecords, received)
}
private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
@ -373,7 +422,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
}
}
private def sendRecords(numRecords: Int) {
private def sendRecords(numRecords: Int, topic: String = this.topic) {
val futures = (0 until numRecords).map { i =>
this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
}

View File

@ -98,7 +98,7 @@ public class StreamsKafkaClient {
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)
);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
@ -226,7 +226,7 @@ public class StreamsKafkaClient {
streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), Time.SYSTEM.milliseconds());
final List<Node> nodes = metadata.fetch().nodes();
return ensureOneNodeIsReady(nodes);