diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 9100cc7af21..303db7442d1 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1918,7 +1918,10 @@ class LogCleanerTest extends Logging {
@Test
def testCleanTombstone(): Unit = {
- val logConfig = new LogConfig(new Properties())
+ val properties = new Properties()
+ // This test uses future timestamps beyond the default of 1 hour.
+ properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
+ val logConfig = new LogConfig(properties)
val log = makeLog(config = logConfig)
val cleaner = makeCleaner(10)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 9bbfa7242c3..b86a5608c3d 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -87,6 +87,8 @@ class DumpLogSegmentsTest {
private def createTestLog = {
val props = new Properties
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
+ // This test uses future timestamps beyond the default of 1 hour.
+ props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
log = UnifiedLog(
dir = logDir,
config = new LogConfig(props),
diff --git a/docs/upgrade.html b/docs/upgrade.html
index aa9820cab5e..b8816fee95b 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -318,6 +318,15 @@
KIP-714 is now enabled for Kafka Streams via KIP-1076.
This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin,
but also to collect the metrics of the Kafka Streams runtime itself.
+
+ The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster
+ recovery post unclean shutdown at the expense of extra IO cycles.
+ See KIP-1030
+
+
+ The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a
+ timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set.
+ See KIP-1030
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index 4cd83ef1acf..fa7ed93850f 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -118,7 +118,7 @@ public class ServerLogConfigs {
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
- public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = Long.MAX_VALUE;
+ public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 3600000; // 1 hour
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " +
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
@@ -126,7 +126,7 @@ public class ServerLogConfigs {
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir";
- public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1;
+ public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2;
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown";
public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable";
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
index 11c3ca7ad09..d94bb571cbe 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@@ -56,7 +55,7 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
- new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
+ new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis()))
// update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
// expect that the three offloaded remote log segments are deleted
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 3ea17e1a098..7f2459cab31 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest {
private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+ NUM_BROKERS,
+ mkProperties(
+ Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 97840cd4a8c..bf0d54bc5c0 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -102,7 +103,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+ NUM_BROKERS,
+ mkProperties(
+ Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll
public static void startCluster() throws Exception {
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index eb35666675e..b95e11df4c6 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -35,10 +35,12 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -46,7 +48,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class SmokeTestDriverIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+ 3,
+ mkProperties(
+ Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll
public static void startCluster() throws IOException {