mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)
These classes are used by `upgrade_test.py` with old Kafka versions so they can only use functionality that exists in all Kafka versions. This change fixes the test for Kafka versions older than 0.11.0. Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
		
							parent
							
								
									c3e453428c
								
							
						
					
					
						commit
						dd9f70e3d1
					
				| 
						 | 
				
			
			@ -43,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
 | 
			
		|||
import org.apache.kafka.common.errors.FencedInstanceIdException;
 | 
			
		||||
import org.apache.kafka.common.errors.WakeupException;
 | 
			
		||||
import org.apache.kafka.common.serialization.StringDeserializer;
 | 
			
		||||
import org.apache.kafka.common.utils.Exit;
 | 
			
		||||
import org.apache.kafka.common.utils.Utils;
 | 
			
		||||
import org.slf4j.Logger;
 | 
			
		||||
import org.slf4j.LoggerFactory;
 | 
			
		||||
| 
						 | 
				
			
			@ -633,7 +632,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
 | 
			
		|||
            brokerHostandPort = res.getString("brokerList");
 | 
			
		||||
        } else {
 | 
			
		||||
            parser.printHelp();
 | 
			
		||||
            Exit.exit(0);
 | 
			
		||||
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
 | 
			
		||||
            System.exit(0);
 | 
			
		||||
        }
 | 
			
		||||
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -663,16 +663,19 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
 | 
			
		|||
        ArgumentParser parser = argParser();
 | 
			
		||||
        if (args.length == 0) {
 | 
			
		||||
            parser.printHelp();
 | 
			
		||||
            Exit.exit(0);
 | 
			
		||||
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
 | 
			
		||||
            System.exit(0);
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            final VerifiableConsumer consumer = createFromArgs(parser, args);
 | 
			
		||||
            Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
 | 
			
		||||
            // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
 | 
			
		||||
            Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook"));
 | 
			
		||||
 | 
			
		||||
            consumer.run();
 | 
			
		||||
        } catch (ArgumentParserException e) {
 | 
			
		||||
            parser.handleError(e);
 | 
			
		||||
            Exit.exit(1);
 | 
			
		||||
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
 | 
			
		||||
            System.exit(1);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 | 
			
		|||
import org.apache.kafka.clients.producer.ProducerRecord;
 | 
			
		||||
import org.apache.kafka.clients.producer.RecordMetadata;
 | 
			
		||||
import org.apache.kafka.common.serialization.StringSerializer;
 | 
			
		||||
import org.apache.kafka.common.utils.Exit;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.io.InputStream;
 | 
			
		||||
| 
						 | 
				
			
			@ -240,7 +239,8 @@ public class VerifiableProducer implements AutoCloseable {
 | 
			
		|||
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
 | 
			
		||||
        } else {
 | 
			
		||||
            parser.printHelp();
 | 
			
		||||
            Exit.exit(0);
 | 
			
		||||
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
 | 
			
		||||
            System.exit(0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
 | 
			
		||||
| 
						 | 
				
			
			@ -528,7 +528,8 @@ public class VerifiableProducer implements AutoCloseable {
 | 
			
		|||
        ArgumentParser parser = argParser();
 | 
			
		||||
        if (args.length == 0) {
 | 
			
		||||
            parser.printHelp();
 | 
			
		||||
            Exit.exit(0);
 | 
			
		||||
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
 | 
			
		||||
            System.exit(0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
| 
						 | 
				
			
			@ -537,7 +538,8 @@ public class VerifiableProducer implements AutoCloseable {
 | 
			
		|||
            final long startMs = System.currentTimeMillis();
 | 
			
		||||
            ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
 | 
			
		||||
 | 
			
		||||
            Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> {
 | 
			
		||||
            // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
 | 
			
		||||
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 | 
			
		||||
                // Trigger main thread to stop producing messages
 | 
			
		||||
                producer.stopProducing = true;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -549,12 +551,13 @@ public class VerifiableProducer implements AutoCloseable {
 | 
			
		|||
                double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
 | 
			
		||||
 | 
			
		||||
                producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
 | 
			
		||||
            });
 | 
			
		||||
            }, "verifiable-producer-shutdown-hook"));
 | 
			
		||||
 | 
			
		||||
            producer.run(throttler);
 | 
			
		||||
        } catch (ArgumentParserException e) {
 | 
			
		||||
            parser.handleError(e);
 | 
			
		||||
            Exit.exit(1);
 | 
			
		||||
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
 | 
			
		||||
            System.exit(1);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue