KAFKA-16246: Cleanups in ConsoleConsumer (#15457)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
This commit is contained in:
Dmitry Werner 2024-03-07 13:39:16 +05:00 committed by GitHub
parent 5f4806fd1c
commit ba0db81e53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 141 additions and 101 deletions

View File

@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
import java.util.Collections;
@ -68,11 +66,8 @@ public class ConsoleConsumer {
public static void run(ConsoleConsumerOptions opts) {
messageCount = 0;
long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE;
Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent()
? new ConsumerWrapper(Optional.of(opts.topicArg()), opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), consumer, timeoutMs)
: new ConsumerWrapper(Optional.of(opts.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs);
ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer);
addShutdownHook(consumerWrapper, opts);
@ -148,43 +143,25 @@ public class ConsoleConsumer {
}
public static class ConsumerWrapper {
final Optional<String> topic;
final OptionalInt partitionId;
final OptionalLong offset;
final Optional<String> includedTopics;
final Consumer<byte[], byte[]> consumer;
final long timeoutMs;
final Time time = Time.SYSTEM;
final long timeoutMs;
final Consumer<byte[], byte[]> consumer;
Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
public ConsumerWrapper(Optional<String> topic,
OptionalInt partitionId,
OptionalLong offset,
Optional<String> includedTopics,
Consumer<byte[], byte[]> consumer,
long timeoutMs) {
this.topic = topic;
this.partitionId = partitionId;
this.offset = offset;
this.includedTopics = includedTopics;
public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], byte[]> consumer) {
this.consumer = consumer;
this.timeoutMs = timeoutMs;
timeoutMs = opts.timeoutMs();
Optional<String> topic = opts.topicArg();
if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) {
seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
// default to latest if no offset is provided
seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP);
} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
consumer.subscribe(Collections.singletonList(topic.get()));
} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) {
consumer.subscribe(Pattern.compile(includedTopics.get()));
if (topic.isPresent()) {
if (opts.partitionArg().isPresent()) {
seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg());
} else {
consumer.subscribe(Collections.singletonList(topic.get()));
}
} else {
throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
"Exactly one of 'topic' or 'include' must be provided. " +
"If 'topic' is provided, an optional 'partition' may also be provided. " +
"If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.");
opts.includedTopicsArg().ifPresent(topics -> consumer.subscribe(Pattern.compile(topics)));
}
}

View File

@ -34,7 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Random;
@ -55,7 +55,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
private final OptionSpec<String> messageFormatterConfigOpt;
private final OptionSpec<?> resetBeginningOpt;
private final OptionSpec<Integer> maxMessagesOpt;
private final OptionSpec<Integer> timeoutMsOpt;
private final OptionSpec<Long> timeoutMsOpt;
private final OptionSpec<?> skipMessageOnErrorOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> keyDeserializerOpt;
@ -66,6 +66,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
private final Properties consumerProps;
private final long offset;
private final long timeoutMs;
private final MessageFormatter formatter;
public ConsoleConsumerOptions(String[] args) throws IOException {
@ -139,7 +140,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
.withRequiredArg()
.describedAs("timeout_ms")
.ofType(Integer.class);
.ofType(Long.class);
skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.");
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.")
@ -184,12 +185,13 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
Set<String> groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
offset = parseOffset();
timeoutMs = parseTimeoutMs();
formatter = buildFormatter();
}
private void checkRequiredArgs() {
List<String> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(Objects::isNull);
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(arg -> !arg.isPresent());
// user need to specify value for either --topic or one of the include filters options (--include or --whitelist)
if (topicOrFilterArgs.size() != 1) {
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. " +
@ -322,6 +324,11 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
"'earliest', 'latest', or a non-negative long.");
}
private long parseTimeoutMs() {
long timeout = options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1;
return timeout >= 0 ? timeout : Long.MAX_VALUE;
}
private MessageFormatter buildFormatter() {
MessageFormatter formatter = null;
try {
@ -365,16 +372,16 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
return OptionalInt.empty();
}
String topicArg() {
return options.valueOf(topicOpt);
Optional<String> topicArg() {
return options.has(topicOpt) ? Optional.of(options.valueOf(topicOpt)) : Optional.empty();
}
int maxMessages() {
return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1;
}
int timeoutMs() {
return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1;
long timeoutMs() {
return timeoutMs;
}
boolean enableSystestEventsLogging() {
@ -385,10 +392,10 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
return options.valueOf(bootstrapServerOpt);
}
String includedTopicsArg() {
Optional<String> includedTopicsArg() {
return options.has(includeOpt)
? options.valueOf(includeOpt)
: options.valueOf(whitelistOpt);
? Optional.of(options.valueOf(includeOpt))
: Optional.ofNullable(options.valueOf(whitelistOpt));
}
Properties formatterArgs() throws IOException {

View File

@ -48,12 +48,12 @@ public class ConsoleConsumerOptionsTest {
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertTrue(config.fromBeginning());
assertFalse(config.enableSystestEventsLogging());
assertFalse(config.skipMessageOnError());
assertEquals(-1, config.maxMessages());
assertEquals(-1, config.timeoutMs());
assertEquals(Long.MAX_VALUE, config.timeoutMs());
}
@Test
@ -67,7 +67,7 @@ public class ConsoleConsumerOptionsTest {
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("includeTest*", config.includedTopicsArg());
assertEquals("includeTest*", config.includedTopicsArg().orElse(""));
assertTrue(config.fromBeginning());
}
@ -82,7 +82,7 @@ public class ConsoleConsumerOptionsTest {
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("whitelistTest*", config.includedTopicsArg());
assertEquals("whitelistTest*", config.includedTopicsArg().orElse(""));
assertTrue(config.fromBeginning());
}
@ -96,7 +96,7 @@ public class ConsoleConsumerOptionsTest {
};
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("includeTest*", config.includedTopicsArg());
assertEquals("includeTest*", config.includedTopicsArg().orElse(""));
assertTrue(config.fromBeginning());
}
@ -112,7 +112,7 @@ public class ConsoleConsumerOptionsTest {
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertTrue(config.partitionArg().isPresent());
assertEquals(0, config.partitionArg().getAsInt());
assertEquals(3, config.offsetArg());
@ -191,7 +191,7 @@ public class ConsoleConsumerOptionsTest {
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertTrue(config.partitionArg().isPresent());
assertEquals(0, config.partitionArg().getAsInt());
assertEquals(-1, config.offsetArg());
@ -211,7 +211,7 @@ public class ConsoleConsumerOptionsTest {
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertFalse(config.fromBeginning());
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@ -228,7 +228,7 @@ public class ConsoleConsumerOptionsTest {
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertFalse(config.fromBeginning());
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@ -246,7 +246,7 @@ public class ConsoleConsumerOptionsTest {
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertTrue(config.fromBeginning());
assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@ -262,7 +262,7 @@ public class ConsoleConsumerOptionsTest {
Properties consumerProperties = config.consumerProps();
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertFalse(config.fromBeginning());
assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
}
@ -442,7 +442,7 @@ public class ConsoleConsumerOptionsTest {
ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertEquals(-2, config.offsetArg());
assertTrue(config.fromBeginning());
@ -455,7 +455,7 @@ public class ConsoleConsumerOptionsTest {
config = new ConsoleConsumerOptions(args);
assertEquals("localhost:9092", config.bootstrapServer());
assertEquals("test", config.topicArg());
assertEquals("test", config.topicArg().orElse(""));
assertEquals(-1, config.offsetArg());
assertFalse(config.fromBeginning());
}
@ -618,4 +618,30 @@ public class ConsoleConsumerOptionsTest {
Exit.resetExitProcedure();
}
}
@Test
public void testParseTimeoutMs() throws Exception {
String[] withoutTimeoutMs = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0"
};
assertEquals(Long.MAX_VALUE, new ConsoleConsumerOptions(withoutTimeoutMs).timeoutMs());
String[] negativeTimeoutMs = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--timeout-ms", "-100"
};
assertEquals(Long.MAX_VALUE, new ConsoleConsumerOptions(negativeTimeoutMs).timeoutMs());
String[] validTimeoutMs = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--timeout-ms", "100"
};
assertEquals(100, new ConsoleConsumerOptions(validTimeoutMs).timeoutMs());
}
}

View File

@ -24,21 +24,19 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -58,8 +56,7 @@ public class ConsoleConsumerTest {
}
@Test
public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() {
String topic = "test";
public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() throws IOException {
final Time time = new MockTime();
final int timeoutMs = 1000;
@ -71,20 +68,22 @@ public class ConsoleConsumerTest {
return ConsumerRecords.EMPTY;
});
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--timeout-ms", String.valueOf(timeoutMs)
};
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(topic),
OptionalInt.empty(),
OptionalLong.empty(),
Optional.empty(),
mockConsumer,
timeoutMs
new ConsoleConsumerOptions(args),
mockConsumer
);
assertThrows(TimeoutException.class, consumer::receive);
}
@Test
public void shouldResetUnConsumedOffsetsBeforeExit() {
public void shouldResetUnConsumedOffsetsBeforeExit() throws IOException {
String topic = "test";
int maxMessages = 123;
int totalMessages = 700;
@ -94,13 +93,16 @@ public class ConsoleConsumerTest {
TopicPartition tp1 = new TopicPartition(topic, 0);
TopicPartition tp2 = new TopicPartition(topic, 1);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", topic,
"--timeout-ms", "1000"
};
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(topic),
OptionalInt.empty(),
OptionalLong.empty(),
Optional.empty(),
mockConsumer,
1000L);
new ConsoleConsumerOptions(args),
mockConsumer
);
mockConsumer.rebalance(Arrays.asList(tp1, tp2));
Map<TopicPartition, Long> offsets = new HashMap<>();
@ -165,47 +167,75 @@ public class ConsoleConsumerTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSeekWhenOffsetIsSet() {
public void shouldSeekWhenOffsetIsSet() throws IOException {
Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class);
TopicPartition tp0 = new TopicPartition("test", 0);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", tp0.topic(),
"--partition", String.valueOf(tp0.partition()),
"--timeout-ms", "1000"
};
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(tp0.topic()),
OptionalInt.of(tp0.partition()),
OptionalLong.empty(),
Optional.empty(),
mockConsumer,
1000L);
new ConsoleConsumerOptions(args),
mockConsumer
);
verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
verify(mockConsumer).seekToEnd(eq(Collections.singletonList(tp0)));
consumer.cleanup();
reset(mockConsumer);
consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(tp0.topic()),
OptionalInt.of(tp0.partition()),
OptionalLong.of(123L),
Optional.empty(),
mockConsumer,
1000L);
args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", tp0.topic(),
"--partition", String.valueOf(tp0.partition()),
"--offset", "123",
"--timeout-ms", "1000"
};
consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
verify(mockConsumer).seek(eq(tp0), eq(123L));
consumer.cleanup();
reset(mockConsumer);
consumer = new ConsoleConsumer.ConsumerWrapper(
Optional.of(tp0.topic()),
OptionalInt.of(tp0.partition()),
OptionalLong.of(ListOffsetsRequest.EARLIEST_TIMESTAMP),
Optional.empty(),
mockConsumer,
1000L);
args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", tp0.topic(),
"--partition", String.valueOf(tp0.partition()),
"--offset", "earliest",
"--timeout-ms", "1000"
};
consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
verify(mockConsumer).seekToBeginning(eq(Collections.singletonList(tp0)));
consumer.cleanup();
reset(mockConsumer);
}
@Test
@SuppressWarnings("unchecked")
public void shouldWorkWithoutTopicOption() throws IOException {
Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class);
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "includeTest*",
"--from-beginning"
};
ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(
new ConsoleConsumerOptions(args),
mockConsumer
);
verify(mockConsumer).subscribe(any(Pattern.class));
consumer.cleanup();
}
}