diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 8f9e5eccdb9..1b467eb2e72 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -180,7 +180,7 @@ public class PageViewTypedDemo { props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 33244c5870f..66281eb61fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -58,6 +58,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; +import static org.apache.kafka.common.config.ConfigDef.parseType; /** * Configuration for a {@link KafkaStreams} instance. @@ -144,6 +145,7 @@ public class StreamsConfig extends AbstractConfig { private final boolean eosEnabled; private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L; private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; + private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000; public static final int DUMMY_THREAD_INDEX = 1; public static final long MAX_TASK_IDLE_MS_DISABLED = -1; @@ -907,7 +909,7 @@ public class StreamsConfig extends AbstractConfig { tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Reduce the transaction timeout for quicker pending offset expiration on broker side. - tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000); + tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT); PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } @@ -1078,6 +1080,26 @@ public class StreamsConfig extends AbstractConfig { if (props.containsKey(RETRIES_CONFIG)) { log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG); } + + if (eosEnabled) { + verifyEOSTransactionTimeoutCompatibility(); + } + } + + private void verifyEOSTransactionTimeoutCompatibility() { + final long commitInterval = getLong(COMMIT_INTERVAL_MS_CONFIG); + final String transactionTimeoutConfigKey = producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); + final int transactionTimeout = originals().containsKey(transactionTimeoutConfigKey) ? (int) parseType( + transactionTimeoutConfigKey, originals().get(transactionTimeoutConfigKey), Type.INT) : DEFAULT_TRANSACTION_TIMEOUT; + + if (transactionTimeout < commitInterval) { + throw new IllegalArgumentException(String.format("Transaction timeout %d was set lower than " + + "streams commit interval %d. This will cause ongoing transaction always timeout due to inactivity " + + "caused by long commit interval. Consider reconfiguring commit interval to match " + + "transaction timeout by tuning 'commit.interval.ms' config, or increase the transaction timeout to match " + + "commit interval by tuning `producer.transaction.timeout.ms` config.", + transactionTimeout, commitInterval)); + } } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 684b60861f3..6397da794d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -47,6 +47,7 @@ import java.util.Properties; import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED; +import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; @@ -398,6 +399,32 @@ public class StreamsConfigTest { assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); } + @SuppressWarnings("deprecation") + @Test + public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSAlpha() { + assertThrows(IllegalArgumentException.class, + () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE)); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSBeta() { + assertThrows(IllegalArgumentException.class, + () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE_BETA)); + } + + @Test + public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() { + testTransactionTimeoutSmallerThanCommitInterval(AT_LEAST_ONCE); + } + + private void testTransactionTimeoutSmallerThanCommitInterval(final String processingGuarantee) { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000L); + props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 3000); + new StreamsConfig(props); + } + @Test public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index e672ca1f8f3..3750222a93a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -148,7 +148,7 @@ public abstract class AbstractResetIntegrationTest { streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java index 1d2465e232d..7f12c563abe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java @@ -90,7 +90,7 @@ public class EmitOnChangeIntegrationTest { mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1), mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class) ) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 41febbc2147..a182bf84d60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -236,7 +236,7 @@ public class EosIntegrationTest { final Properties properties = new Properties(); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); @@ -325,7 +325,7 @@ public class EosIntegrationTest { final Properties properties = new Properties(); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -874,9 +874,14 @@ public class EosIntegrationTest { .to(SINGLE_PARTITION_OUTPUT_TOPIC); final Properties properties = new Properties(); + // Set commit interval to a larger value to avoid affection of controlled stream commit, + // but not too large as we need to have a relatively low transaction timeout such + // that it should help trigger the timed out transaction in time. + final long commitIntervalMs = 20_000L; properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads); - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs); + properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitIntervalMs); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java index 6a3f27fabbd..4543b99b0da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java @@ -936,13 +936,14 @@ public class EosV2UpgradeIntegrationTest { final Properties properties = new Properties(); properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); + final long commitInterval = Duration.ofMinutes(1L).toMillis(); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); - properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(1L).toMillis()); + properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval); properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index 1cf08003a8e..baaf06c573a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -145,7 +145,7 @@ public class FineGrainedAutoResetIntegrationTest { final Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -284,7 +284,7 @@ public class FineGrainedAutoResetIntegrationTest { public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException { final Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 2316d75547b..065e175d179 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -126,7 +126,7 @@ public class GlobalKTableEOSIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index f7f71bd7bce..447e0a2e34a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -110,7 +110,7 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.>as(globalStore) .withKeySerde(Serdes.Long()) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index 3bc99ac42c4..5ef4345959d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -119,7 +119,7 @@ public class GlobalThreadShutDownOrderTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 6e8ac281e94..b40d8c1dc92 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -297,7 +297,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest { mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "2"), mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"), mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L), mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()), // Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455) mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index f70cbef51b8..eb241c566f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -102,7 +102,7 @@ public class InternalTopicIntegrationTest { streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index b29e0725661..b7b1f4edfad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -143,7 +143,7 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 50079010afb..4ae4fdba850 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -138,7 +138,7 @@ public class KStreamRepartitionIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java index 24bb1313d54..ad7514e3794 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java @@ -210,7 +210,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); // increase the session timeout value, to avoid unnecessary rebalance streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); return streamsConfig; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index dd3873c7ee6..6d50ea99cc4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -70,7 +70,7 @@ public class KTableSourceTopicRestartIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java index f1e529f545e..6a024966d9f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java @@ -116,7 +116,7 @@ public class LagFetchIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); consumerConfiguration = new Properties(); consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index e92b2c157f9..7f7eabb1e1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -197,7 +197,7 @@ public class OptimizedKTableIntegrationTest { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index cd1dd0dca61..137c05487de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -226,7 +226,7 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); stringComparator = Comparator.comparing((KeyValue o) -> o.key).thenComparing(o -> o.value); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 01d4205a5ee..c2091df53ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -130,7 +130,7 @@ public class RegexSourceIntegrationTest { outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); final Properties properties = new Properties(); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java index 435ce9fd1d4..c3e7c283d0c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java @@ -125,7 +125,7 @@ public class ResetPartitionTimeIntegrationTest { streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, (long) DEFAULT_TIMEOUT); streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 90890686af4..df0645ed9e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -126,7 +126,7 @@ public class RestoreIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index d762370489c..16cefd4ccaf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -406,7 +406,7 @@ public class StandbyTaskEOSIntegrationTest { streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); // need to set to zero to get predictable active/standby task assignments streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index b403024f734..4f2ba21d0d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -462,7 +462,7 @@ public class StoreQueryIntegrationTest { config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); return config; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 12dfdec7662..b332e143346 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -97,7 +97,7 @@ public class StoreUpgradeIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index 541ee1373d8..c0fc7963df1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -122,7 +122,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index b6e9676abcc..64985927a3b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -109,7 +109,7 @@ public class SuppressionDurabilityIntegrationTest { private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final Serde STRING_SERDE = Serdes.String(); private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); - private static final int COMMIT_INTERVAL = 100; + private static final long COMMIT_INTERVAL = 100L; @SuppressWarnings("deprecation") @Parameterized.Parameters(name = "{0}") @@ -167,12 +167,13 @@ public class SuppressionDurabilityIntegrationTest { final Properties streamsConfig = mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.POLL_MS_CONFIG, Long.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + KafkaStreams driver = getStartedStreams(streamsConfig, builder, true); try { // start by putting some stuff in the buffer diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 68a624f9ed2..71ef0e36909 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -512,7 +512,7 @@ public class SuppressionIntegrationTest { mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index d82f6c9d1a8..bd4829989a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -109,7 +109,7 @@ public class RepartitionOptimizingTest { public void setUp() { streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10)); - streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(5000)); + streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000)); processorValueCollector.clear(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java index 03fa03a74d8..dbc855de1de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java @@ -87,7 +87,7 @@ public class RepartitionWithMergeOptimizingTest { public void setUp() { streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10)); - streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(5000)); + streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000)); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index ef535628c11..a1379f8b8a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1388,7 +1388,7 @@ public class StreamThreadTest { consumer.assign(new HashSet<>(assignedPartitions)); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); - mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1); + mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); thread.runOnce(); assertThat(producer.history().size(), equalTo(1)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 13f1f2b395b..26b5e417f32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -201,7 +201,7 @@ public class CachingPersistentWindowStoreTest { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000L); final Instant initialWallClockTime = Instant.ofEpochMilli(0L); final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 90a6b94b511..8a402ab9abf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -71,7 +71,7 @@ public class BrokerCompatibilityTest { streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode); final int timeout = 6000; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index aca41d3effb..eb81a71800b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -105,7 +105,7 @@ public class EosTestClient extends SmokeTestUtil { props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); // increase commit interval to make sure a client is killed having an open transaction + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java index 8688cf2d2b3..8802109be8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java @@ -635,17 +635,19 @@ public class RelationalSmokeTest extends SmokeTestUtil { final String id, final String processingGuarantee, final String stateDir) { - return mkProperties( - mkMap( - mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker), - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application), - mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"), - mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), - mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir) - ) - ); + final Properties properties = + mkProperties( + mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application), + mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee), + mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir) + ) + ); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + return properties; } public static KafkaStreams startSync(final String broker, diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java index 8c7c5cd5647..dafc6ef6ece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -57,7 +57,7 @@ public class StaticMemberTestClient { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index 400a5fc992a..90c2bb94ece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -68,7 +68,7 @@ public class StreamsBrokerDownResilienceTest { streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience"); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); // it is expected that max.poll.interval, retries, request.timeout and max.block.ms set diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index dc689264a09..a7e72e3fbf3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -65,7 +65,7 @@ public class StreamsStandByReplicaTest { } streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks"); - streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 82b6ccce759..2ad07f2fa3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -97,7 +97,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); final KafkaClientSupplier kafkaClientSupplier; if (streamsProperties.containsKey("test.future.metadata")) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index c5f6e6b3b0d..bd5752aa30d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -53,7 +53,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 3c1d99fde25..27712cc5ace 100644 --- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -53,7 +53,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder, config); diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 15f9ac19dd0..ee15b1dfbc4 100644 --- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -50,7 +50,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = config.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 53ee0dcc42b..379720b9562 100644 --- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -56,7 +56,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder, config); diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index f82c84fa8e4..6b339b64f40 100644 --- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -49,7 +49,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = config.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index d710c054d85..75e548439ce 100644 --- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -49,7 +49,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder, config); diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index f79aee1708d..32ef2ebe50b 100644 --- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -45,7 +45,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = config.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index b56423334b9..e029161853a 100644 --- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -49,7 +49,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder, config); diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 6c248b46454..a2ffa9d14a5 100644 --- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -44,7 +44,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = config.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 8b2649f85e7..fc7cdd0e2f9 100644 --- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -49,7 +49,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index b0f46dfd0b9..bda6ac45833 100644 --- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 62e4f747655..ebe59ab9fce 100644 --- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -49,7 +49,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 9a63b282fe7..6643d29fad8 100644 --- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index f1980d86746..5becd29282b 100644 --- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index cfb827bf086..0c697f6b4cf 100644 --- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index d467df099b6..1f0a17d81d4 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 7f15003c2bd..299fffacaaf 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 844339db140..82d102d8ad7 100644 --- a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 7f15003c2bd..299fffacaaf 100644 --- a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 18e2c378542..45e0628a979 100644 --- a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 1a82f0954ce..0a7a48fac1c 100644 --- a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -50,7 +50,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 346124a24c0..c0c8c72c599 100644 --- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index a24a6315b77..0fea040bcb4 100644 --- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 72f8e2770da..e1b294ff15b 100644 --- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config); diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 11eb2196f94..621b8c873cb 100644 --- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -48,7 +48,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); final KafkaStreams streams = new KafkaStreams(builder.build(), config);