From c231f07798bbdcfe0cefaada2c13a728aaf9ccef Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 7 Oct 2025 21:32:03 +0800 Subject: [PATCH 1/2] fix null value and error message --- .../main/java/org/apache/kafka/streams/CloseOptions.java | 6 +++++- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java index 25be8530a4c..73b0ea4132f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java +++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java @@ -81,7 +81,11 @@ public class CloseOptions { * @return this {@code CloseOptions} instance. */ public CloseOptions withTimeout(final Duration timeout) { - this.timeout = Optional.ofNullable(timeout); + if (timeout == null) { + this.timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE)); + } else { + this.timeout = Optional.of(timeout); + } return this; } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 999befa3495..dfcfbaf94e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1635,7 +1635,7 @@ public class KafkaStreams implements AutoCloseable { public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException { Objects.requireNonNull(options, "options cannot be null"); final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options); - final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout().get(), "timeout"); final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix); if (timeoutMs < 0) { throw new IllegalArgumentException("Timeout can't be negative."); From 0b6f7ba0a974b2e18c7a21c18cc0a000c02bb578 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 7 Oct 2025 22:15:58 +0800 Subject: [PATCH 2/2] addressed by comments --- .../main/java/org/apache/kafka/streams/CloseOptions.java | 6 +----- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 5 +++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java index 73b0ea4132f..25be8530a4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java +++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java @@ -81,11 +81,7 @@ public class CloseOptions { * @return this {@code CloseOptions} instance. */ public CloseOptions withTimeout(final Duration timeout) { - if (timeout == null) { - this.timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE)); - } else { - this.timeout = Optional.of(timeout); - } + this.timeout = Optional.ofNullable(timeout); return this; } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index dfcfbaf94e2..fad08e243b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1635,8 +1635,9 @@ public class KafkaStreams implements AutoCloseable { public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException { Objects.requireNonNull(options, "options cannot be null"); final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options); - final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout().get(), "timeout"); - final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix); + final Duration timeout = optionsInternal.timeout().orElse(Duration.ofMillis(Long.MAX_VALUE)); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); + final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); if (timeoutMs < 0) { throw new IllegalArgumentException("Timeout can't be negative."); }