KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Jason Taylor 2025-01-16 14:30:00 +00:00 committed by Divij Vaidya
parent 43ea0b2f54
commit caf0b67fbb
8 changed files with 34 additions and 8 deletions

View File

@ -1918,7 +1918,10 @@ class LogCleanerTest extends Logging {
@Test
def testCleanTombstone(): Unit = {
val logConfig = new LogConfig(new Properties())
val properties = new Properties()
// This test uses future timestamps beyond the default of 1 hour.
properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
val logConfig = new LogConfig(properties)
val log = makeLog(config = logConfig)
val cleaner = makeCleaner(10)

View File

@ -87,6 +87,8 @@ class DumpLogSegmentsTest {
private def createTestLog = {
val props = new Properties
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
// This test uses future timestamps beyond the default of 1 hour.
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
log = UnifiedLog(
dir = logDir,
config = new LogConfig(props),

View File

@ -318,6 +318,15 @@
KIP-714 is now enabled for Kafka Streams via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>.
This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin,
but also to collect the <a href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself.
</li>
The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster
recovery post unclean shutdown at the expense of extra IO cycles.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
<li>
The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a
timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
</ul>
</li>

View File

@ -118,7 +118,7 @@ public class ServerLogConfigs {
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = Long.MAX_VALUE;
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 3600000; // 1 hour
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " +
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
@ -126,7 +126,7 @@ public class ServerLogConfigs {
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir";
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1;
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2;
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown";
public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable";

View File

@ -23,7 +23,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@ -56,7 +55,7 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis()))
// update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
// expect that the three offloaded remote log segments are deleted

View File

@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest {
private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {

View File

@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -102,7 +103,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll
public static void startCluster() throws Exception {

View File

@ -35,10 +35,12 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -46,7 +48,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class SmokeTestDriverIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
@BeforeAll
public static void startCluster() throws IOException {