mirror of https://github.com/apache/kafka.git
Remove busy waiting in testTopicNotExistingInMetadata
This commit is contained in:
parent
9922d9dacd
commit
2b68e44a35
|
@ -1005,8 +1005,9 @@ public class KafkaProducerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testTopicNotExistingInMetadata() throws InterruptedException {
|
public void testTopicNotExistingInMetadata() throws InterruptedException {
|
||||||
Map<String, Object> configs = new HashMap<>();
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
final String maxBlockMs = "300000";
|
||||||
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
|
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
|
||||||
long refreshBackoffMs = 500L;
|
long refreshBackoffMs = 500L;
|
||||||
long refreshBackoffMaxMs = 5000L;
|
long refreshBackoffMaxMs = 5000L;
|
||||||
long metadataExpireMs = 60000L;
|
long metadataExpireMs = 60000L;
|
||||||
|
@ -1015,8 +1016,10 @@ public class KafkaProducerTest {
|
||||||
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
|
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
|
||||||
new LogContext(), new ClusterResourceListeners(), time);
|
new LogContext(), new ClusterResourceListeners(), time);
|
||||||
final String topic = "topic";
|
final String topic = "topic";
|
||||||
|
MockClient client = new MockClient(time, metadata);
|
||||||
|
client.setShouldUpdateWithCurrentMetadata(false);
|
||||||
try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
|
try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
|
||||||
new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) {
|
new StringSerializer(), metadata, client, null, time)) {
|
||||||
|
|
||||||
Exchanger<Void> exchanger = new Exchanger<>();
|
Exchanger<Void> exchanger = new Exchanger<>();
|
||||||
|
|
||||||
|
@ -1025,11 +1028,10 @@ public class KafkaProducerTest {
|
||||||
// Update the metadata with non-existing topic.
|
// Update the metadata with non-existing topic.
|
||||||
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
|
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
|
||||||
singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
|
singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
|
||||||
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
|
client.updateMetadata(updateResponse);
|
||||||
exchanger.exchange(null);
|
exchanger.exchange(null);
|
||||||
while (!metadata.updateRequested())
|
assertTrue(client.awaitMetadataUpdateRequest(Long.parseLong(maxBlockMs)));
|
||||||
Thread.sleep(100);
|
time.sleep(Long.parseLong(maxBlockMs));
|
||||||
time.sleep(30 * 1000L);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue