KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Jason Taylor 2025-01-16 14:30:00 +00:00 committed by Divij Vaidya
parent 43ea0b2f54
commit caf0b67fbb
8 changed files with 34 additions and 8 deletions

View File

@ -1918,7 +1918,10 @@ class LogCleanerTest extends Logging {
@Test @Test
def testCleanTombstone(): Unit = { def testCleanTombstone(): Unit = {
val logConfig = new LogConfig(new Properties()) val properties = new Properties()
// This test uses future timestamps beyond the default of 1 hour.
properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
val logConfig = new LogConfig(properties)
val log = makeLog(config = logConfig) val log = makeLog(config = logConfig)
val cleaner = makeCleaner(10) val cleaner = makeCleaner(10)

View File

@ -87,6 +87,8 @@ class DumpLogSegmentsTest {
private def createTestLog = { private def createTestLog = {
val props = new Properties val props = new Properties
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128") props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
// This test uses future timestamps beyond the default of 1 hour.
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
log = UnifiedLog( log = UnifiedLog(
dir = logDir, dir = logDir,
config = new LogConfig(props), config = new LogConfig(props),

View File

@ -318,6 +318,15 @@
KIP-714 is now enabled for Kafka Streams via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>. KIP-714 is now enabled for Kafka Streams via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>.
This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin, This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin,
but also to collect the <a href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself. but also to collect the <a href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself.
</li>
The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster
recovery post unclean shutdown at the expense of extra IO cycles.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
<li>
The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a
timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li> </li>
</ul> </ul>
</li> </li>

View File

@ -118,7 +118,7 @@ public class ServerLogConfigs {
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG); public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = Long.MAX_VALUE; public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 3600000; // 1 hour
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " + public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " +
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " + "message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " + "timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
@ -126,7 +126,7 @@ public class ServerLogConfigs {
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir"; public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir";
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1; public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2;
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"; public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown";
public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable"; public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable";

View File

@ -23,7 +23,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@ -56,7 +55,7 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2")) .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1))) new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis()))
// update the topic config such that it triggers the deletion of segments // update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList()) .updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
// expect that the three offloaded remote log segments are deleted // expect that the three offloaded remote log segments are deleted

View File

@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest {
private static final String OUTPUT_TOPIC_4 = "outputTopic_4"; private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
private static final String OUTPUT_TOPIC_5 = "outputTopic_5"; private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll @BeforeAll
public static void startCluster() throws IOException, InterruptedException { public static void startCluster() throws IOException, InterruptedException {

View File

@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes; import static java.time.Duration.ofMinutes;
import static java.time.Instant.ofEpochMilli; import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -102,7 +103,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class KStreamAggregationIntegrationTest { public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1; private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll @BeforeAll
public static void startCluster() throws Exception { public static void startCluster() throws Exception {

View File

@ -35,10 +35,12 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -46,7 +48,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600) @Timeout(600)
@Tag("integration") @Tag("integration")
public class SmokeTestDriverIntegrationTest { public class SmokeTestDriverIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll @BeforeAll
public static void startCluster() throws IOException { public static void startCluster() throws IOException {