KAFKA-16879 SystemTime should use singleton mode (#16266)

Reviewers: Greg Harris <gharris1727@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
dujian0068 2024-06-14 08:49:19 +08:00 committed by GitHub
parent 6af937639d
commit 133f2b0f31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 76 additions and 85 deletions

View File

@ -24,7 +24,12 @@ import java.util.function.Supplier;
* A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance
* of this class.
*/
public class SystemTime implements Time {
class SystemTime implements Time {
private static final SystemTime SYSTEM_TIME = new SystemTime();
public static SystemTime getSystemTime() {
return SYSTEM_TIME;
}
@Override
public long milliseconds() {
@ -57,4 +62,7 @@ public class SystemTime implements Time {
}
}
private SystemTime() {
}
}

View File

@ -30,7 +30,7 @@ import java.util.function.Supplier;
*/
public interface Time {
Time SYSTEM = new SystemTime();
Time SYSTEM = SystemTime.getSystemTime();
/**
* Returns the current time in milliseconds.

View File

@ -79,7 +79,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@ -3587,7 +3587,7 @@ public abstract class ConsumerCoordinatorTest {
public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGroup() {
final List<TopicPartition> partitions = singletonList(t1p);
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) {
final SystemTime realTime = new SystemTime();
final Time realTime = Time.SYSTEM;
coordinator.ensureActiveGroup();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
@ -3620,7 +3620,7 @@ public abstract class ConsumerCoordinatorTest {
public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterResettingGenerationId() {
final List<TopicPartition> partitions = singletonList(t1p);
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) {
final SystemTime realTime = new SystemTime();
final Time realTime = Time.SYSTEM;
coordinator.ensureActiveGroup();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.TokenBucket;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
@ -80,45 +79,45 @@ public class SensorTest {
@Test
public void testShouldRecordForInfoLevelSensor() {
Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, new SystemTime(),
Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, new SystemTime(),
infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, new SystemTime(),
infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
}
@Test
public void testShouldRecordForDebugLevelSensor() {
Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, new SystemTime(),
Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertFalse(debugSensor.shouldRecord());
debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, new SystemTime(),
debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertTrue(debugSensor.shouldRecord());
debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, new SystemTime(),
debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertTrue(debugSensor.shouldRecord());
}
@Test
public void testShouldRecordForTraceLevelSensor() {
Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, new SystemTime(),
Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.TRACE);
assertFalse(traceSensor.shouldRecord());
traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, new SystemTime(),
traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.TRACE);
assertFalse(traceSensor.shouldRecord());
traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, new SystemTime(),
traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, Time.SYSTEM,
0, Sensor.RecordingLevel.TRACE);
assertTrue(traceSensor.shouldRecord());
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
@ -31,7 +30,7 @@ import org.slf4j.LoggerFactory;
*/
public class ErrorHandlingMetrics implements AutoCloseable {
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
private final ConnectMetrics.MetricGroup metricGroup;

View File

@ -22,7 +22,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
@ -97,14 +97,14 @@ public class RetryWithToleranceOperatorTest {
return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new ErrorHandlingMetrics(
new ConnectorTaskId("noop-connector", -1),
new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES),
new SystemTime(), "test-cluster")));
Time.SYSTEM, "test-cluster")));
}
public static <T> RetryWithToleranceOperator<T> allOperator() {
return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ALL, new ErrorHandlingMetrics(
new ConnectorTaskId("errors-all-tolerate-connector", -1),
new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(PROPERTIES),
new SystemTime(), "test-cluster")));
Time.SYSTEM, "test-cluster")));
}
private static <T> RetryWithToleranceOperator<T> genericOperator(int retryTimeout, ToleranceType toleranceType, ErrorHandlingMetrics metrics) {

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, ListOffsetsRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.metadata.LeaderRecoveryState
@ -2914,7 +2914,7 @@ class PartitionTest extends AbstractPartitionTest {
val topicPartition = new TopicPartition("test", 1)
val partition = new Partition(
topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0),
new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
Time.SYSTEM, mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager]))
val replicas = Seq(0, 1, 2, 3)

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@ -168,7 +168,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, leaderEpoch, 1)).asJava
//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
@ -317,7 +317,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, leaderEpoch, 1)).asJava
//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -383,7 +383,7 @@ class ReplicaFetcherThreadTest {
t2p1 -> newOffsetForLeaderPartitionResult(t2p1, leaderEpoch, 172)).asJava
//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -443,7 +443,7 @@ class ReplicaFetcherThreadTest {
t2p1 -> newOffsetForLeaderPartitionResult(t2p1, leaderEpochAtLeader, 202)).asJava
//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -506,7 +506,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, 4, 143)).asJava
// Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -584,7 +584,7 @@ class ReplicaFetcherThreadTest {
stub(partition, replicaManager, log)
// Create the fetcher thread
val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, Time.SYSTEM)
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config,
@ -694,7 +694,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new MockBlockingSender(
Collections.emptyMap(),
brokerEndPoint,
new SystemTime()
Time.SYSTEM
)
val leader = new RemoteLeaderEndPoint(
@ -787,7 +787,7 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new MockBlockingSender(
Collections.emptyMap(),
brokerEndPoint,
new SystemTime()
Time.SYSTEM
)
val leader = new RemoteLeaderEndPoint(
@ -883,7 +883,7 @@ class ReplicaFetcherThreadTest {
t1p1 -> newOffsetForLeaderPartitionResult(t1p1, UNDEFINED_EPOCH, 143)).asJava
// Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -947,7 +947,7 @@ class ReplicaFetcherThreadTest {
t1p0 -> newOffsetForLeaderPartitionResult(t1p0, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).asJava
//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -1008,7 +1008,7 @@ class ReplicaFetcherThreadTest {
).asJava
//Create the thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -1070,7 +1070,7 @@ class ReplicaFetcherThreadTest {
).asJava
//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,
@ -1130,7 +1130,7 @@ class ReplicaFetcherThreadTest {
).asJava
//Create the fetcher thread
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM)
val thread = createReplicaFetcherThread(
"bob",
0,

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions._
@ -246,7 +246,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
val node = from.metadataCache.getAliveBrokerNode(to.config.brokerId,
from.config.interBrokerListenerName).get
val endPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
new BrokerBlockingSender(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext())
new BrokerBlockingSender(endPoint, from.config, new Metrics(), Time.SYSTEM, 42, "TestFetcher", new LogContext())
}
private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = {

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetsResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.utils.{SystemTime, Time}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import java.net.SocketTimeoutException
@ -46,7 +46,7 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset],
time: Time)
extends BlockingSend {
private val client = new MockClient(new SystemTime)
private val client = new MockClient(Time.SYSTEM)
var fetchCount = 0
var epochFetchCount = 0
var listOffsetsCount = 0

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
@ -92,7 +92,7 @@ public class ConsumerTaskTest {
.collect(Collectors.toMap(Function.identity(), e -> 0L));
consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
consumer.updateBeginningOffsets(offsets);
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime());
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, Time.SYSTEM);
thread = new Thread(consumerTask);
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
@ -44,7 +43,7 @@ public class RemoteLogMetadataSerdeTest {
public static final String TOPIC = "foo";
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
@Test
public void testRemoteLogSegmentMetadataSerde() {

View File

@ -19,7 +19,6 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
@ -41,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
@Test
public void testRemoteLogSegmentMetadataTransform() {

View File

@ -23,7 +23,6 @@ import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
@ -66,7 +65,7 @@ public class RemoteLogSegmentLifecycleTest {
private final Uuid topicId = Uuid.randomUuid();
private final TopicPartition tp = new TopicPartition("foo", 0);
private final TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, tp);
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
private final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = spy(new RemotePartitionMetadataStore());
private final ClusterInstance clusterInstance;

View File

@ -27,7 +27,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@ -58,7 +57,7 @@ import static org.mockito.Mockito.verify;
@ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
private final ClusterInstance clusterInstance;
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;

View File

@ -24,7 +24,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@ -42,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class TopicBasedRemoteLogMetadataManagerRestartTest {
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
private final ClusterInstance clusterInstance;

View File

@ -25,7 +25,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@ -56,7 +55,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
private static final int SEG_SIZE = 1048576;
private final ClusterInstance clusterInstance;
private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore());
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager;
TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {

View File

@ -23,7 +23,6 @@ import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
@ -55,7 +54,7 @@ public class RemoteLogMetadataManagerTest {
private static final int SEG_SIZE = 1048576;
private static final int BROKER_ID_0 = 0;
private static final int BROKER_ID_1 = 1;
private final Time time = new SystemTime();
private final Time time = Time.SYSTEM;
RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Suppressed;
@ -148,7 +147,7 @@ public class KTableSuppressProcessorMetricsTest {
streamsConfig.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
final MockInternalNewProcessorContext<String, Change<Long>> context =
new MockInternalNewProcessorContext<>(streamsConfig, TASK_ID, TestUtils.tempDirectory());
final Time time = new SystemTime();
final Time time = Time.SYSTEM;
context.setCurrentNode(new ProcessorNode("testNode"));
context.setSystemTimeMs(time.milliseconds());

View File

@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@ -1584,7 +1583,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
TestUtils.tempDirectory(),
new StreamsConfig(streamsConfig)
);
final Time time = new SystemTime();
final Time time = Time.SYSTEM;
context.setSystemTimeMs(time.milliseconds());
bytesStore.init((StateStoreContext) context, bytesStore);

View File

@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@ -789,7 +788,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
TestUtils.tempDirectory(),
new StreamsConfig(streamsConfig)
);
final Time time = new SystemTime();
final Time time = Time.SYSTEM;
context.setSystemTimeMs(time.milliseconds());
bytesStore.init((StateStoreContext) context, bytesStore);

View File

@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@ -801,7 +800,7 @@ public abstract class AbstractSessionBytesStoreTest {
new StreamsConfig(streamsConfig),
recordCollector
);
final Time time = new SystemTime();
final Time time = Time.SYSTEM;
context.setTime(1L);
context.setSystemTimeMs(time.milliseconds());
sessionStore.init((StateStoreContext) context, sessionStore);

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@ -993,7 +992,7 @@ public abstract class AbstractWindowBytesStoreTest {
new StreamsConfig(streamsConfig),
recordCollector
);
final Time time = new SystemTime();
final Time time = Time.SYSTEM;
context.setSystemTimeMs(time.milliseconds());
context.setTime(1L);
windowStore.init((StateStoreContext) context, windowStore);

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
@ -871,14 +871,14 @@ public abstract class TopologyTestDriverTest {
Stores.inMemoryKeyValueStore("store"),
Serdes.ByteArray(),
Serdes.ByteArray(),
new SystemTime()),
Time.SYSTEM),
"processor");
topology.addGlobalStore(
new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore("globalStore"),
Serdes.ByteArray(),
Serdes.ByteArray(),
new SystemTime()).withLoggingDisabled(),
Time.SYSTEM).withLoggingDisabled(),
"sourceProcessorName",
Serdes.ByteArray().deserializer(),
Serdes.ByteArray().deserializer(),
@ -1263,13 +1263,13 @@ public abstract class TopologyTestDriverTest {
Stores.inMemoryKeyValueStore("store"),
Serdes.ByteArray(),
Serdes.ByteArray(),
new SystemTime()));
Time.SYSTEM));
topology.addGlobalStore(
new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore("globalStore"),
Serdes.ByteArray(),
Serdes.ByteArray(),
new SystemTime()).withLoggingDisabled(),
Time.SYSTEM).withLoggingDisabled(),
"sourceProcessorName",
Serdes.ByteArray().deserializer(),
Serdes.ByteArray().deserializer(),

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -97,7 +96,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
"producer/consumer/streams/connect instance");
public PushHttpMetricsReporter() {
time = new SystemTime();
time = Time.SYSTEM;
executor = Executors.newSingleThreadScheduledExecutor();
}

View File

@ -64,7 +64,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ConnectionStressWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class);
private static final Time TIME = Time.SYSTEM;
private static final int THROTTLE_PERIOD_MS = 100;
@ -109,7 +108,7 @@ public class ConnectionStressWorker implements TaskWorker {
synchronized (ConnectionStressWorker.this) {
this.totalConnections = 0;
this.totalFailedConnections = 0;
this.startTimeMs = TIME.milliseconds();
this.startTimeMs = Time.SYSTEM.milliseconds();
}
this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
@ -164,10 +163,10 @@ public class ConnectionStressWorker implements TaskWorker {
List<Node> nodes = updater.fetchNodes();
Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
// channelBuilder will be closed as part of Selector.close()
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, TIME, logContext);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, Time.SYSTEM, logContext);
try (Metrics metrics = new Metrics()) {
try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, TIME, "", channelBuilder, logContext)) {
metrics, Time.SYSTEM, "", channelBuilder, logContext)) {
try (NetworkClient client = new NetworkClient(selector,
updater,
"ConnectionStressWorker",
@ -179,12 +178,12 @@ public class ConnectionStressWorker implements TaskWorker {
1000,
10 * 1000,
127 * 1000,
TIME,
Time.SYSTEM,
false,
new ApiVersions(),
logContext,
MetadataRecoveryStrategy.NONE)) {
NetworkClientUtils.awaitReady(client, targetNode, TIME, 500);
NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM, 500);
}
}
}

View File

@ -27,8 +27,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
@ -62,7 +62,6 @@ import java.util.stream.Collectors;
public class SustainedConnectionWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(SustainedConnectionWorker.class);
private static final SystemTime SYSTEM_TIME = new SystemTime();
// This is the metadata for the test itself.
private final String id;
@ -166,7 +165,7 @@ public class SustainedConnectionWorker implements TaskWorker {
}
protected void completeRefresh() {
this.nextUpdate = SustainedConnectionWorker.SYSTEM_TIME.milliseconds() + this.refreshRate;
this.nextUpdate = Time.SYSTEM.milliseconds() + this.refreshRate;
this.inUse = false;
}
@ -390,7 +389,7 @@ public class SustainedConnectionWorker implements TaskWorker {
if (currentConnection.isPresent()) {
currentConnection.get().refresh();
} else {
SustainedConnectionWorker.SYSTEM_TIME.sleep(SustainedConnectionWorker.BACKOFF_PERIOD_MS);
Time.SYSTEM.sleep(SustainedConnectionWorker.BACKOFF_PERIOD_MS);
}
}
} catch (Exception e) {
@ -401,7 +400,7 @@ public class SustainedConnectionWorker implements TaskWorker {
}
private synchronized Optional<SustainedConnection> findConnectionToMaintain() {
final long milliseconds = SustainedConnectionWorker.SYSTEM_TIME.milliseconds();
final long milliseconds = Time.SYSTEM.milliseconds();
for (SustainedConnection connection : this.connections) {
if (connection.needsRefresh(milliseconds)) {
connection.claim();
@ -424,7 +423,7 @@ public class SustainedConnectionWorker implements TaskWorker {
SustainedConnectionWorker.this.totalMetadataConnections.get(),
SustainedConnectionWorker.this.totalMetadataFailedConnections.get(),
SustainedConnectionWorker.this.totalAbortedThreads.get(),
SustainedConnectionWorker.SYSTEM_TIME.milliseconds()));
Time.SYSTEM.milliseconds()));
status.update(node);
} catch (Exception e) {
SustainedConnectionWorker.log.error("Aborted test while running StatusUpdater", e);