KAFKA-12499: add transaction timeout verification (#10482)

This PR tries to add the check for transaction timeout for a comparison against commit interval of streams. If transaction timeout is smaller than commit interval, stream should crash and inform user to update their commit interval to be larger or equal to the given transaction timeout, or vise versa.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>
This commit is contained in:
Boyang Chen 2021-05-21 15:05:39 -07:00 committed by GitHub
parent 72d108274c
commit ae8b784537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
65 changed files with 138 additions and 80 deletions

View File

@ -180,7 +180,7 @@ public class PageViewTypedDemo {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

View File

@ -58,6 +58,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.parseType;
/** /**
* Configuration for a {@link KafkaStreams} instance. * Configuration for a {@link KafkaStreams} instance.
@ -144,6 +145,7 @@ public class StreamsConfig extends AbstractConfig {
private final boolean eosEnabled; private final boolean eosEnabled;
private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L; private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000;
public static final int DUMMY_THREAD_INDEX = 1; public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1; public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
@ -907,7 +909,7 @@ public class StreamsConfig extends AbstractConfig {
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Reduce the transaction timeout for quicker pending offset expiration on broker side. // Reduce the transaction timeout for quicker pending offset expiration on broker side.
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000); tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT);
PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
} }
@ -1078,6 +1080,26 @@ public class StreamsConfig extends AbstractConfig {
if (props.containsKey(RETRIES_CONFIG)) { if (props.containsKey(RETRIES_CONFIG)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG); log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG);
} }
if (eosEnabled) {
verifyEOSTransactionTimeoutCompatibility();
}
}
private void verifyEOSTransactionTimeoutCompatibility() {
final long commitInterval = getLong(COMMIT_INTERVAL_MS_CONFIG);
final String transactionTimeoutConfigKey = producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final int transactionTimeout = originals().containsKey(transactionTimeoutConfigKey) ? (int) parseType(
transactionTimeoutConfigKey, originals().get(transactionTimeoutConfigKey), Type.INT) : DEFAULT_TRANSACTION_TIMEOUT;
if (transactionTimeout < commitInterval) {
throw new IllegalArgumentException(String.format("Transaction timeout %d was set lower than " +
"streams commit interval %d. This will cause ongoing transaction always timeout due to inactivity " +
"caused by long commit interval. Consider reconfiguring commit interval to match " +
"transaction timeout by tuning 'commit.interval.ms' config, or increase the transaction timeout to match " +
"commit interval by tuning `producer.transaction.timeout.ms` config.",
transactionTimeout, commitInterval));
}
} }
@Override @Override

View File

@ -47,6 +47,7 @@ import java.util.Properties;
import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED; import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
@ -398,6 +399,32 @@ public class StreamsConfigTest {
assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
} }
@SuppressWarnings("deprecation")
@Test
public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSAlpha() {
assertThrows(IllegalArgumentException.class,
() -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE));
}
@SuppressWarnings("deprecation")
@Test
public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSBeta() {
assertThrows(IllegalArgumentException.class,
() -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE_BETA));
}
@Test
public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() {
testTransactionTimeoutSmallerThanCommitInterval(AT_LEAST_ONCE);
}
private void testTransactionTimeoutSmallerThanCommitInterval(final String processingGuarantee) {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000L);
props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 3000);
new StreamsConfig(props);
}
@Test @Test
public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");

View File

@ -148,7 +148,7 @@ public abstract class AbstractResetIntegrationTest {
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT)); streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));

View File

@ -90,7 +90,7 @@ public class EmitOnChangeIntegrationTest {
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1), mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class) mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
) )

View File

@ -236,7 +236,7 @@ public class EosIntegrationTest {
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
@ -325,7 +325,7 @@ public class EosIntegrationTest {
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@ -874,9 +874,14 @@ public class EosIntegrationTest {
.to(SINGLE_PARTITION_OUTPUT_TOPIC); .to(SINGLE_PARTITION_OUTPUT_TOPIC);
final Properties properties = new Properties(); final Properties properties = new Properties();
// Set commit interval to a larger value to avoid affection of controlled stream commit,
// but not too large as we need to have a relatively low transaction timeout such
// that it should help trigger the timed out transaction in time.
final long commitIntervalMs = 20_000L;
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads); properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitIntervalMs);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);

View File

@ -936,13 +936,14 @@ public class EosV2UpgradeIntegrationTest {
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir); properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); final long commitInterval = Duration.ofMinutes(1L).toMillis();
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(1L).toMillis()); properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class); properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir); properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);

View File

@ -145,7 +145,7 @@ public class FineGrainedAutoResetIntegrationTest {
final Properties props = new Properties(); final Properties props = new Properties();
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@ -284,7 +284,7 @@ public class FineGrainedAutoResetIntegrationTest {
public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException { public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
final Properties props = new Properties(); final Properties props = new Properties();
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

View File

@ -126,7 +126,7 @@ public class GlobalKTableEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

View File

@ -110,7 +110,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore) Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
.withKeySerde(Serdes.Long()) .withKeySerde(Serdes.Long())

View File

@ -119,7 +119,7 @@ public class GlobalThreadShutDownOrderTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());

View File

@ -297,7 +297,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "2"), mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "2"),
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"), mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"),
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener), mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L),
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()), mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()),
// Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455) // Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40) mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)

View File

@ -102,7 +102,7 @@ public class InternalTopicIntegrationTest {
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
} }

View File

@ -143,7 +143,7 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -138,7 +138,7 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization); streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);

View File

@ -210,7 +210,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
// increase the session timeout value, to avoid unnecessary rebalance // increase the session timeout value, to avoid unnecessary rebalance
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
return streamsConfig; return streamsConfig;

View File

@ -70,7 +70,7 @@ public class KTableSourceTopicRestartIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L);
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300); STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300);

View File

@ -116,7 +116,7 @@ public class LagFetchIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
consumerConfiguration = new Properties(); consumerConfiguration = new Properties();
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

View File

@ -197,7 +197,7 @@ public class OptimizedKTableIntegrationTest {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

View File

@ -226,7 +226,7 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value); stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value);

View File

@ -130,7 +130,7 @@ public class RegexSourceIntegrationTest {
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L); properties.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L);

View File

@ -125,7 +125,7 @@ public class ResetPartitionTimeIntegrationTest {
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT));
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, (long) DEFAULT_TIMEOUT);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());

View File

@ -126,7 +126,7 @@ public class RestoreIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return streamsConfiguration; return streamsConfiguration;
} }

View File

@ -406,7 +406,7 @@ public class StandbyTaskEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
// need to set to zero to get predictable active/standby task assignments // need to set to zero to get predictable active/standby task assignments
streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0); streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

View File

@ -462,7 +462,7 @@ public class StoreQueryIntegrationTest {
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
return config; return config;
} }
} }

View File

@ -97,7 +97,7 @@ public class StoreUpgradeIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return streamsConfiguration; return streamsConfiguration;
} }

View File

@ -122,7 +122,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization); streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);

View File

@ -109,7 +109,7 @@ public class SuppressionDurabilityIntegrationTest {
private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String(); private static final Serde<String> STRING_SERDE = Serdes.String();
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private static final int COMMIT_INTERVAL = 100; private static final long COMMIT_INTERVAL = 100L;
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
@ -167,12 +167,13 @@ public class SuppressionDurabilityIntegrationTest {
final Properties streamsConfig = mkProperties(mkMap( final Properties streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.POLL_MS_CONFIG, Long.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
)); ));
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
KafkaStreams driver = getStartedStreams(streamsConfig, builder, true); KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
try { try {
// start by putting some stuff in the buffer // start by putting some stuff in the buffer

View File

@ -512,7 +512,7 @@ public class SuppressionIntegrationTest {
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
)); ));

View File

@ -109,7 +109,7 @@ public class RepartitionOptimizingTest {
public void setUp() { public void setUp() {
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10)); streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(5000)); streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
processorValueCollector.clear(); processorValueCollector.clear();
} }

View File

@ -87,7 +87,7 @@ public class RepartitionWithMergeOptimizingTest {
public void setUp() { public void setUp() {
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10)); streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(5000)); streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
} }
@After @After

View File

@ -1388,7 +1388,7 @@ public class StreamThreadTest {
consumer.assign(new HashSet<>(assignedPartitions)); consumer.assign(new HashSet<>(assignedPartitions));
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0]));
mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
thread.runOnce(); thread.runOnce();
assertThat(producer.history().size(), equalTo(1)); assertThat(producer.history().size(), equalTo(1));

View File

@ -201,7 +201,7 @@ public class CachingPersistentWindowStoreTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000L);
final Instant initialWallClockTime = Instant.ofEpochMilli(0L); final Instant initialWallClockTime = Instant.ofEpochMilli(0L);
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime); final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);

View File

@ -71,7 +71,7 @@ public class BrokerCompatibilityTest {
streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode); streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
final int timeout = 6000; final int timeout = 6000;

View File

@ -105,7 +105,7 @@ public class EosTestClient extends SmokeTestUtil {
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); // increase commit interval to make sure a client is killed having an open transaction props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -635,17 +635,19 @@ public class RelationalSmokeTest extends SmokeTestUtil {
final String id, final String id,
final String processingGuarantee, final String processingGuarantee,
final String stateDir) { final String stateDir) {
return mkProperties( final Properties properties =
mkMap( mkProperties(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker), mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker),
mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id), mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee), mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee),
mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir) mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir)
) )
); );
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
return properties;
} }
public static KafkaStreams startSync(final String broker, public static KafkaStreams startSync(final String broker,

View File

@ -57,7 +57,7 @@ public class StaticMemberTestClient {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);

View File

@ -68,7 +68,7 @@ public class StreamsBrokerDownResilienceTest {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience"); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience");
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
// it is expected that max.poll.interval, retries, request.timeout and max.block.ms set // it is expected that max.poll.interval, retries, request.timeout and max.block.ms set

View File

@ -65,7 +65,7 @@ public class StreamsStandByReplicaTest {
} }
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks"); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

View File

@ -97,7 +97,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
final KafkaClientSupplier kafkaClientSupplier; final KafkaClientSupplier kafkaClientSupplier;
if (streamsProperties.containsKey("test.future.metadata")) { if (streamsProperties.containsKey("test.future.metadata")) {

View File

@ -53,7 +53,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -53,7 +53,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder, config); final KafkaStreams streams = new KafkaStreams(builder, config);

View File

@ -50,7 +50,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source"); final String sourceTopic = config.getProperty("source.topic", "source");

View File

@ -56,7 +56,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder, config); final KafkaStreams streams = new KafkaStreams(builder, config);

View File

@ -49,7 +49,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source"); final String sourceTopic = config.getProperty("source.topic", "source");

View File

@ -49,7 +49,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder, config); final KafkaStreams streams = new KafkaStreams(builder, config);

View File

@ -45,7 +45,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source"); final String sourceTopic = config.getProperty("source.topic", "source");

View File

@ -49,7 +49,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder, config); final KafkaStreams streams = new KafkaStreams(builder, config);

View File

@ -44,7 +44,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source"); final String sourceTopic = config.getProperty("source.topic", "source");

View File

@ -49,7 +49,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -49,7 +49,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -52,7 +52,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -50,7 +50,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); final String sourceTopic = streamsProperties.getProperty("source.topic", "source");

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);

View File

@ -48,7 +48,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config); final KafkaStreams streams = new KafkaStreams(builder.build(), config);