From 133f2b0f311ba1fd5a999f477bad38370c1772ca Mon Sep 17 00:00:00 2001 From: dujian0068 <1426703092@qq.com> Date: Fri, 14 Jun 2024 08:49:19 +0800 Subject: [PATCH] KAFKA-16879 SystemTime should use singleton mode (#16266) Reviewers: Greg Harris , Chia-Ping Tsai --- .../apache/kafka/common/utils/SystemTime.java | 10 ++++++- .../org/apache/kafka/common/utils/Time.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 6 ++-- .../kafka/common/metrics/SensorTest.java | 19 ++++++------- .../runtime/errors/ErrorHandlingMetrics.java | 3 +- .../RetryWithToleranceOperatorTest.java | 6 ++-- .../unit/kafka/cluster/PartitionTest.scala | 4 +-- .../server/ReplicaFetcherThreadTest.scala | 28 +++++++++---------- .../epoch/LeaderEpochIntegrationTest.scala | 4 +-- .../epoch/util/MockBlockingSender.scala | 4 +-- .../metadata/storage/ConsumerTaskTest.java | 4 +-- .../storage/RemoteLogMetadataSerdeTest.java | 3 +- .../RemoteLogMetadataTransformTest.java | 3 +- .../RemoteLogSegmentLifecycleTest.java | 3 +- ...adataManagerMultipleSubscriptionsTest.java | 3 +- ...edRemoteLogMetadataManagerRestartTest.java | 3 +- ...opicBasedRemoteLogMetadataManagerTest.java | 3 +- .../storage/RemoteLogMetadataManagerTest.java | 3 +- .../KTableSuppressProcessorMetricsTest.java | 3 +- ...lSchemaRocksDBSegmentedBytesStoreTest.java | 3 +- ...bstractRocksDBSegmentedBytesStoreTest.java | 3 +- .../AbstractSessionBytesStoreTest.java | 3 +- .../AbstractWindowBytesStoreTest.java | 3 +- .../kafka/streams/TopologyTestDriverTest.java | 10 +++---- .../kafka/tools/PushHttpMetricsReporter.java | 3 +- .../workload/ConnectionStressWorker.java | 11 ++++---- .../workload/SustainedConnectionWorker.java | 11 ++++---- 27 files changed, 76 insertions(+), 85 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java index 31919a22155..524353507b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java @@ -24,7 +24,12 @@ import java.util.function.Supplier; * A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance * of this class. */ -public class SystemTime implements Time { +class SystemTime implements Time { + private static final SystemTime SYSTEM_TIME = new SystemTime(); + + public static SystemTime getSystemTime() { + return SYSTEM_TIME; + } @Override public long milliseconds() { @@ -57,4 +62,7 @@ public class SystemTime implements Time { } } + private SystemTime() { + + } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Time.java b/clients/src/main/java/org/apache/kafka/common/utils/Time.java index e4172c2f611..d144ee3b152 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Time.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Time.java @@ -30,7 +30,7 @@ import java.util.function.Supplier; */ public interface Time { - Time SYSTEM = new SystemTime(); + Time SYSTEM = SystemTime.getSystemTime(); /** * Returns the current time in milliseconds. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index d2bf9b49cb9..b5be02d6a1b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -79,7 +79,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; @@ -3587,7 +3587,7 @@ public abstract class ConsumerCoordinatorTest { public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGroup() { final List partitions = singletonList(t1p); try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) { - final SystemTime realTime = new SystemTime(); + final Time realTime = Time.SYSTEM; coordinator.ensureActiveGroup(); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS); @@ -3620,7 +3620,7 @@ public abstract class ConsumerCoordinatorTest { public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterResettingGenerationId() { final List partitions = singletonList(t1p); try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) { - final SystemTime realTime = new SystemTime(); + final Time realTime = Time.SYSTEM; coordinator.ensureActiveGroup(); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java index df3eedd176a..681e108d1d8 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.TokenBucket; import org.apache.kafka.common.metrics.stats.WindowedSum; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Test; @@ -80,45 +79,45 @@ public class SensorTest { @Test public void testShouldRecordForInfoLevelSensor() { - Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, new SystemTime(), + Sensor infoSensor = new Sensor(null, "infoSensor", null, INFO_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.INFO); assertTrue(infoSensor.shouldRecord()); - infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, new SystemTime(), + infoSensor = new Sensor(null, "infoSensor", null, DEBUG_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.INFO); assertTrue(infoSensor.shouldRecord()); - infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, new SystemTime(), + infoSensor = new Sensor(null, "infoSensor", null, TRACE_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.INFO); assertTrue(infoSensor.shouldRecord()); } @Test public void testShouldRecordForDebugLevelSensor() { - Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, new SystemTime(), + Sensor debugSensor = new Sensor(null, "debugSensor", null, INFO_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.DEBUG); assertFalse(debugSensor.shouldRecord()); - debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, new SystemTime(), + debugSensor = new Sensor(null, "debugSensor", null, DEBUG_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.DEBUG); assertTrue(debugSensor.shouldRecord()); - debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, new SystemTime(), + debugSensor = new Sensor(null, "debugSensor", null, TRACE_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.DEBUG); assertTrue(debugSensor.shouldRecord()); } @Test public void testShouldRecordForTraceLevelSensor() { - Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, new SystemTime(), + Sensor traceSensor = new Sensor(null, "traceSensor", null, INFO_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.TRACE); assertFalse(traceSensor.shouldRecord()); - traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, new SystemTime(), + traceSensor = new Sensor(null, "traceSensor", null, DEBUG_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.TRACE); assertFalse(traceSensor.shouldRecord()); - traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, new SystemTime(), + traceSensor = new Sensor(null, "traceSensor", null, TRACE_CONFIG, Time.SYSTEM, 0, Sensor.RecordingLevel.TRACE); assertTrue(traceSensor.shouldRecord()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java index e96832e3403..5052bd80f03 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java @@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.CumulativeSum; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.ConnectMetrics; import org.apache.kafka.connect.runtime.ConnectMetricsRegistry; @@ -31,7 +30,7 @@ import org.slf4j.LoggerFactory; */ public class ErrorHandlingMetrics implements AutoCloseable { - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; private final ConnectMetrics.MetricGroup metricGroup; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java index 8360e9a6c36..66d2e335599 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.ConnectMetrics; @@ -97,14 +97,14 @@ public class RetryWithToleranceOperatorTest { return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new ErrorHandlingMetrics( new ConnectorTaskId("noop-connector", -1), new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES), - new SystemTime(), "test-cluster"))); + Time.SYSTEM, "test-cluster"))); } public static RetryWithToleranceOperator allOperator() { return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ALL, new ErrorHandlingMetrics( new ConnectorTaskId("errors-all-tolerate-connector", -1), new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(PROPERTIES), - new SystemTime(), "test-cluster"))); + Time.SYSTEM, "test-cluster"))); } private static RetryWithToleranceOperator genericOperator(int retryTimeout, ToleranceType toleranceType, ErrorHandlingMetrics metrics) { diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 6dc6cc2a3c1..d0a630b4431 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, ListOffsetsRequest, RequestHeader} -import org.apache.kafka.common.utils.SystemTime +import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.metadata.LeaderRecoveryState @@ -2914,7 +2914,7 @@ class PartitionTest extends AbstractPartitionTest { val topicPartition = new TopicPartition("test", 1) val partition = new Partition( topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0), - new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]), + Time.SYSTEM, mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]), mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager])) val replicas = Seq(0, 1, 2, 3) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 7d2e1997087..ce502ea1e22 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} -import org.apache.kafka.common.utils.{LogContext, SystemTime} +import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -168,7 +168,7 @@ class ReplicaFetcherThreadTest { t1p1 -> newOffsetForLeaderPartitionResult(t1p1, leaderEpoch, 1)).asJava //Create the fetcher thread - val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", @@ -317,7 +317,7 @@ class ReplicaFetcherThreadTest { t1p1 -> newOffsetForLeaderPartitionResult(t1p1, leaderEpoch, 1)).asJava //Create the fetcher thread - val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -383,7 +383,7 @@ class ReplicaFetcherThreadTest { t2p1 -> newOffsetForLeaderPartitionResult(t2p1, leaderEpoch, 172)).asJava //Create the thread - val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -443,7 +443,7 @@ class ReplicaFetcherThreadTest { t2p1 -> newOffsetForLeaderPartitionResult(t2p1, leaderEpochAtLeader, 202)).asJava //Create the thread - val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -506,7 +506,7 @@ class ReplicaFetcherThreadTest { t1p1 -> newOffsetForLeaderPartitionResult(t1p1, 4, 143)).asJava // Create the fetcher thread - val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -584,7 +584,7 @@ class ReplicaFetcherThreadTest { stub(partition, replicaManager, log) // Create the fetcher thread - val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, Time.SYSTEM) val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ") val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id) val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config, @@ -694,7 +694,7 @@ class ReplicaFetcherThreadTest { val mockNetwork = new MockBlockingSender( Collections.emptyMap(), brokerEndPoint, - new SystemTime() + Time.SYSTEM ) val leader = new RemoteLeaderEndPoint( @@ -787,7 +787,7 @@ class ReplicaFetcherThreadTest { val mockNetwork = new MockBlockingSender( Collections.emptyMap(), brokerEndPoint, - new SystemTime() + Time.SYSTEM ) val leader = new RemoteLeaderEndPoint( @@ -883,7 +883,7 @@ class ReplicaFetcherThreadTest { t1p1 -> newOffsetForLeaderPartitionResult(t1p1, UNDEFINED_EPOCH, 143)).asJava // Create the fetcher thread - val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsets, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -947,7 +947,7 @@ class ReplicaFetcherThreadTest { t1p0 -> newOffsetForLeaderPartitionResult(t1p0, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).asJava //Create the thread - val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -1008,7 +1008,7 @@ class ReplicaFetcherThreadTest { ).asJava //Create the thread - val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -1070,7 +1070,7 @@ class ReplicaFetcherThreadTest { ).asJava //Create the fetcher thread - val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, @@ -1130,7 +1130,7 @@ class ReplicaFetcherThreadTest { ).asJava //Create the fetcher thread - val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, new SystemTime()) + val mockNetwork = new MockBlockingSender(offsetsReply, brokerEndPoint, Time.SYSTEM) val thread = createReplicaFetcherThread( "bob", 0, diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index 13291faff61..cb73a53c3d2 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_ import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.StringSerializer -import org.apache.kafka.common.utils.{LogContext, SystemTime} +import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions._ @@ -246,7 +246,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { val node = from.metadataCache.getAliveBrokerNode(to.config.brokerId, from.config.interBrokerListenerName).get val endPoint = new BrokerEndPoint(node.id(), node.host(), node.port()) - new BrokerBlockingSender(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext()) + new BrokerBlockingSender(endPoint, from.config, new Metrics(), Time.SYSTEM, 42, "TestFetcher", new LogContext()) } private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala index 4dc598fdce8..10caea4a100 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.AbstractRequest.Builder import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetsResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata} -import org.apache.kafka.common.utils.{SystemTime, Time} +import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import java.net.SocketTimeoutException @@ -46,7 +46,7 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset], time: Time) extends BlockingSend { - private val client = new MockClient(new SystemTime) + private val client = new MockClient(Time.SYSTEM) var fetchCount = 0 var epochFetchCount = 0 var listOffsetsCount = 0 diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java index cef1d335b93..ba9d1b1e6ce 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; @@ -92,7 +92,7 @@ public class ConsumerTaskTest { .collect(Collectors.toMap(Function.identity(), e -> 0L)); consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST)); consumer.updateBeginningOffsets(offsets); - consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime()); + consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, Time.SYSTEM); thread = new Thread(consumerTask); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java index b1b91dacf23..e6b473d1086 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; @@ -44,7 +43,7 @@ public class RemoteLogMetadataSerdeTest { public static final String TOPIC = "foo"; private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; @Test public void testRemoteLogSegmentMetadataSerde() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java index 70770542eb9..8512dbaa5c0 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform; @@ -41,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class RemoteLogMetadataTransformTest { private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; @Test public void testRemoteLogSegmentMetadataTransform() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java index 7dbf924653e..7c7e917f2da 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java @@ -23,7 +23,6 @@ import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; @@ -66,7 +65,7 @@ public class RemoteLogSegmentLifecycleTest { private final Uuid topicId = Uuid.randomUuid(); private final TopicPartition tp = new TopicPartition("foo", 0); private final TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, tp); - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; private final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = spy(new RemotePartitionMetadataStore()); private final ClusterInstance clusterInstance; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java index d02da098040..1788a90d06a 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -58,7 +57,7 @@ import static org.mockito.Mockito.verify; @ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { private final ClusterInstance clusterInstance; - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index a08f12d9ae2..e5516821a22 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -42,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("integration") public class TopicBasedRemoteLogMetadataManagerRestartTest { - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath(); private final ClusterInstance clusterInstance; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 8b9cfd0700c..25bfd39538d 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -56,7 +55,7 @@ public class TopicBasedRemoteLogMetadataManagerTest { private static final int SEG_SIZE = 1048576; private final ClusterInstance clusterInstance; private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager; TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java index d970dd8b68c..8e179022334 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java @@ -23,7 +23,6 @@ import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils; import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore; @@ -55,7 +54,7 @@ public class RemoteLogMetadataManagerTest { private static final int SEG_SIZE = 1048576; private static final int BROKER_ID_0 = 0; private static final int BROKER_ID_1 = 1; - private final Time time = new SystemTime(); + private final Time time = Time.SYSTEM; RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index 9fdecd920d7..4ad3700e73f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Suppressed; @@ -148,7 +147,7 @@ public class KTableSuppressProcessorMetricsTest { streamsConfig.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST); final MockInternalNewProcessorContext> context = new MockInternalNewProcessorContext<>(streamsConfig, TASK_ID, TestUtils.tempDirectory()); - final Time time = new SystemTime(); + final Time time = Time.SYSTEM; context.setCurrentNode(new ProcessorNode("testNode")); context.setSystemTimeMs(time.milliseconds()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index c7af25726a4..7dfdd857fe8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -1584,7 +1583,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest TestUtils.tempDirectory(), new StreamsConfig(streamsConfig) ); - final Time time = new SystemTime(); + final Time time = Time.SYSTEM; context.setSystemTimeMs(time.milliseconds()); bytesStore.init((StateStoreContext) context, bytesStore); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 2d75f751344..6a35de13b1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -801,7 +800,7 @@ public abstract class AbstractSessionBytesStoreTest { new StreamsConfig(streamsConfig), recordCollector ); - final Time time = new SystemTime(); + final Time time = Time.SYSTEM; context.setTime(1L); context.setSystemTimeMs(time.milliseconds()); sessionStore.init((StateStoreContext) context, sessionStore); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 688efab857d..a8b29d068c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -993,7 +992,7 @@ public abstract class AbstractWindowBytesStoreTest { new StreamsConfig(streamsConfig), recordCollector ); - final Time time = new SystemTime(); + final Time time = Time.SYSTEM; context.setSystemTimeMs(time.milliseconds()); context.setTime(1L); windowStore.init((StateStoreContext) context, windowStore); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 922619b4562..f3186e99ac2 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; @@ -871,14 +871,14 @@ public abstract class TopologyTestDriverTest { Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray(), - new SystemTime()), + Time.SYSTEM), "processor"); topology.addGlobalStore( new KeyValueStoreBuilder<>( Stores.inMemoryKeyValueStore("globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), - new SystemTime()).withLoggingDisabled(), + Time.SYSTEM).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), @@ -1263,13 +1263,13 @@ public abstract class TopologyTestDriverTest { Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray(), - new SystemTime())); + Time.SYSTEM)); topology.addGlobalStore( new KeyValueStoreBuilder<>( Stores.inMemoryKeyValueStore("globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), - new SystemTime()).withLoggingDisabled(), + Time.SYSTEM).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java index 3b85e6b7f08..d031baac2b8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java +++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import com.fasterxml.jackson.annotation.JsonProperty; @@ -97,7 +96,7 @@ public class PushHttpMetricsReporter implements MetricsReporter { "producer/consumer/streams/connect instance"); public PushHttpMetricsReporter() { - time = new SystemTime(); + time = Time.SYSTEM; executor = Executors.newSingleThreadScheduledExecutor(); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index 77f93c5e31a..af861d38195 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -64,7 +64,6 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ConnectionStressWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class); - private static final Time TIME = Time.SYSTEM; private static final int THROTTLE_PERIOD_MS = 100; @@ -109,7 +108,7 @@ public class ConnectionStressWorker implements TaskWorker { synchronized (ConnectionStressWorker.this) { this.totalConnections = 0; this.totalFailedConnections = 0; - this.startTimeMs = TIME.milliseconds(); + this.startTimeMs = Time.SYSTEM.milliseconds(); } this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false)); @@ -164,10 +163,10 @@ public class ConnectionStressWorker implements TaskWorker { List nodes = updater.fetchNodes(); Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); // channelBuilder will be closed as part of Selector.close() - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, TIME, logContext); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, Time.SYSTEM, logContext); try (Metrics metrics = new Metrics()) { try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), - metrics, TIME, "", channelBuilder, logContext)) { + metrics, Time.SYSTEM, "", channelBuilder, logContext)) { try (NetworkClient client = new NetworkClient(selector, updater, "ConnectionStressWorker", @@ -179,12 +178,12 @@ public class ConnectionStressWorker implements TaskWorker { 1000, 10 * 1000, 127 * 1000, - TIME, + Time.SYSTEM, false, new ApiVersions(), logContext, MetadataRecoveryStrategy.NONE)) { - NetworkClientUtils.awaitReady(client, targetNode, TIME, 500); + NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM, 500); } } } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java index 9ac4ac68b35..559a1885850 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java @@ -27,8 +27,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; @@ -62,7 +62,6 @@ import java.util.stream.Collectors; public class SustainedConnectionWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(SustainedConnectionWorker.class); - private static final SystemTime SYSTEM_TIME = new SystemTime(); // This is the metadata for the test itself. private final String id; @@ -166,7 +165,7 @@ public class SustainedConnectionWorker implements TaskWorker { } protected void completeRefresh() { - this.nextUpdate = SustainedConnectionWorker.SYSTEM_TIME.milliseconds() + this.refreshRate; + this.nextUpdate = Time.SYSTEM.milliseconds() + this.refreshRate; this.inUse = false; } @@ -390,7 +389,7 @@ public class SustainedConnectionWorker implements TaskWorker { if (currentConnection.isPresent()) { currentConnection.get().refresh(); } else { - SustainedConnectionWorker.SYSTEM_TIME.sleep(SustainedConnectionWorker.BACKOFF_PERIOD_MS); + Time.SYSTEM.sleep(SustainedConnectionWorker.BACKOFF_PERIOD_MS); } } } catch (Exception e) { @@ -401,7 +400,7 @@ public class SustainedConnectionWorker implements TaskWorker { } private synchronized Optional findConnectionToMaintain() { - final long milliseconds = SustainedConnectionWorker.SYSTEM_TIME.milliseconds(); + final long milliseconds = Time.SYSTEM.milliseconds(); for (SustainedConnection connection : this.connections) { if (connection.needsRefresh(milliseconds)) { connection.claim(); @@ -424,7 +423,7 @@ public class SustainedConnectionWorker implements TaskWorker { SustainedConnectionWorker.this.totalMetadataConnections.get(), SustainedConnectionWorker.this.totalMetadataFailedConnections.get(), SustainedConnectionWorker.this.totalAbortedThreads.get(), - SustainedConnectionWorker.SYSTEM_TIME.milliseconds())); + Time.SYSTEM.milliseconds())); status.update(node); } catch (Exception e) { SustainedConnectionWorker.log.error("Aborted test while running StatusUpdater", e);