mirror of https://github.com/apache/kafka.git
KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106)
Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
parent
43ea0b2f54
commit
caf0b67fbb
|
@ -1918,7 +1918,10 @@ class LogCleanerTest extends Logging {
|
|||
|
||||
@Test
|
||||
def testCleanTombstone(): Unit = {
|
||||
val logConfig = new LogConfig(new Properties())
|
||||
val properties = new Properties()
|
||||
// This test uses future timestamps beyond the default of 1 hour.
|
||||
properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
|
||||
val logConfig = new LogConfig(properties)
|
||||
|
||||
val log = makeLog(config = logConfig)
|
||||
val cleaner = makeCleaner(10)
|
||||
|
|
|
@ -87,6 +87,8 @@ class DumpLogSegmentsTest {
|
|||
private def createTestLog = {
|
||||
val props = new Properties
|
||||
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
|
||||
// This test uses future timestamps beyond the default of 1 hour.
|
||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
|
||||
log = UnifiedLog(
|
||||
dir = logDir,
|
||||
config = new LogConfig(props),
|
||||
|
|
|
@ -318,6 +318,15 @@
|
|||
KIP-714 is now enabled for Kafka Streams via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>.
|
||||
This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin,
|
||||
but also to collect the <a href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself.
|
||||
</li>
|
||||
The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster
|
||||
recovery post unclean shutdown at the expense of extra IO cycles.
|
||||
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
|
||||
</li>
|
||||
<li>
|
||||
The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a
|
||||
timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set.
|
||||
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
|
||||
</li>
|
||||
</ul>
|
||||
</li>
|
||||
|
|
|
@ -118,7 +118,7 @@ public class ServerLogConfigs {
|
|||
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
|
||||
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
|
||||
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
|
||||
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = Long.MAX_VALUE;
|
||||
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 3600000; // 1 hour
|
||||
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " +
|
||||
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
|
||||
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
|
||||
|
@ -126,7 +126,7 @@ public class ServerLogConfigs {
|
|||
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
|
||||
|
||||
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir";
|
||||
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1;
|
||||
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2;
|
||||
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown";
|
||||
|
||||
public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable";
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
|
||||
|
||||
|
@ -56,7 +55,7 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
|
|||
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
|
||||
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
|
||||
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
|
||||
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
|
||||
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis()))
|
||||
// update the topic config such that it triggers the deletion of segments
|
||||
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
|
||||
// expect that the three offloaded remote log segments are deleted
|
||||
|
|
|
@ -58,6 +58,7 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
import static org.apache.kafka.test.TestUtils.waitForCondition;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest {
|
|||
private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
|
||||
private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
|
||||
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
|
||||
NUM_BROKERS,
|
||||
mkProperties(
|
||||
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
|
||||
|
||||
@BeforeAll
|
||||
public static void startCluster() throws IOException, InterruptedException {
|
||||
|
|
|
@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import static java.time.Duration.ofMillis;
|
||||
import static java.time.Duration.ofMinutes;
|
||||
import static java.time.Instant.ofEpochMilli;
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
@ -102,7 +103,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public class KStreamAggregationIntegrationTest {
|
||||
private static final int NUM_BROKERS = 1;
|
||||
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
|
||||
NUM_BROKERS,
|
||||
mkProperties(
|
||||
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
|
||||
|
||||
@BeforeAll
|
||||
public static void startCluster() throws Exception {
|
||||
|
|
|
@ -35,10 +35,12 @@ import org.junit.jupiter.params.provider.CsvSource;
|
|||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
|
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -46,7 +48,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
@Timeout(600)
|
||||
@Tag("integration")
|
||||
public class SmokeTestDriverIntegrationTest {
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
|
||||
3,
|
||||
mkProperties(
|
||||
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
|
||||
|
||||
@BeforeAll
|
||||
public static void startCluster() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue