From 2b9729ba771ae139dd9931cb7cdc7247f80ee842 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 16 Apr 2024 09:20:49 +0200 Subject: [PATCH] MINOR: Various cleanups in server and server-common (#15710) Reviewers: Chia-Ping Tsai --- .../apache/kafka/deferred/DeferredEventQueue.java | 6 +----- .../kafka/server/metrics/KafkaMetricsGroup.java | 4 ++-- .../kafka/server/metrics/KafkaYammerMetrics.java | 2 +- .../java/org/apache/kafka/server/util/Csv.java | 2 +- .../java/org/apache/kafka/server/util/Json.java | 2 +- .../kafka/server/util/timer/TimingWheel.java | 2 -- .../apache/kafka/timeline/TimelineHashMap.java | 15 +++++---------- .../apache/kafka/timeline/TimelineHashSet.java | 7 +++---- .../kafka/deferred/DeferredEventQueueTest.java | 4 ---- .../apache/kafka/queue/KafkaEventQueueTest.java | 12 ++++++------ .../apache/kafka/server/util/DeadlineTest.java | 3 --- .../apache/kafka/server/util/FutureUtilsTest.java | 4 ++-- .../org/apache/kafka/server/util/JsonTest.java | 4 ++-- .../apache/kafka/server/util/MockScheduler.java | 6 ++---- .../server/util/timer/SystemTimerReaperTest.java | 5 +---- .../server/util/timer/TimerTaskListTest.java | 2 +- .../apache/kafka/timeline/BaseHashTableTest.java | 14 +++++++------- .../kafka/timeline/SnapshotRegistryTest.java | 6 ++---- .../timeline/SnapshottableHashTableTest.java | 7 ++----- .../apache/kafka/server/AssignmentsManager.java | 10 +++++----- .../server/metrics/ClientMetricsInstance.java | 3 +-- .../kafka/server/AssignmentsManagerTest.java | 3 +-- 22 files changed, 46 insertions(+), 77 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java index 32e183ce9b0..a59724dc5ec 100644 --- a/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java @@ -102,11 +102,7 @@ public class DeferredEventQueue { offset + " which is lower than that."); } } - List events = pending.get(offset); - if (events == null) { - events = new ArrayList<>(); - pending.put(offset, events); - } + List events = pending.computeIfAbsent(offset, k -> new ArrayList<>()); events.add(event); if (log.isTraceEnabled()) { log.trace("Adding deferred event {} at offset {}", event, offset); diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java index 4560cd3f22c..9f25261e8a6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java @@ -117,7 +117,7 @@ public class KafkaMetricsGroup { private static Optional toMBeanName(Map tags) { List> filteredTags = tags.entrySet().stream() - .filter(entry -> !entry.getValue().equals("")) + .filter(entry -> !entry.getValue().isEmpty()) .collect(Collectors.toList()); if (!filteredTags.isEmpty()) { String tagsString = filteredTags.stream() @@ -131,7 +131,7 @@ public class KafkaMetricsGroup { private static Optional toScope(Map tags) { List> filteredTags = tags.entrySet().stream() - .filter(entry -> !entry.getValue().equals("")) + .filter(entry -> !entry.getValue().isEmpty()) .collect(Collectors.toList()); if (!filteredTags.isEmpty()) { // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java index 6b2f0a1e38d..4cdb49ec223 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java @@ -106,7 +106,7 @@ public class KafkaYammerMetrics implements Reconfigurable { nameBuilder.append(":type="); nameBuilder.append(typeName); - if (name.length() > 0) { + if (!name.isEmpty()) { nameBuilder.append(",name="); nameBuilder.append(name); } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Csv.java b/server-common/src/main/java/org/apache/kafka/server/util/Csv.java index d09b45263c2..f164dd11414 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/Csv.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/Csv.java @@ -29,7 +29,7 @@ public class Csv { */ public static Map parseCsvMap(String str) { Map map = new HashMap<>(); - if (str == null || "".equals(str)) + if (str == null || str.isEmpty()) return map; String[] keyVals = str.split("\\s*,\\s*"); for (String s : keyVals) { diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Json.java b/server-common/src/main/java/org/apache/kafka/server/util/Json.java index 620e841f6fe..dbb73c02204 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/Json.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/Json.java @@ -37,7 +37,7 @@ public final class Json { */ public static Optional parseFull(String input) { try { - return Optional.ofNullable(tryParseFull(input)); + return Optional.of(tryParseFull(input)); } catch (JsonProcessingException e) { return Optional.empty(); } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java index 43fae38f65e..156768fb412 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java @@ -96,7 +96,6 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class TimingWheel { private final long tickMs; - private final long startMs; private final int wheelSize; private final AtomicInteger taskCounter; private final DelayQueue queue; @@ -116,7 +115,6 @@ public class TimingWheel { DelayQueue queue ) { this.tickMs = tickMs; - this.startMs = startMs; this.wheelSize = wheelSize; this.taskCounter = taskCounter; this.queue = queue; diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java index 8239b1410af..aaf9126adee 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java @@ -28,7 +28,7 @@ import java.util.Set; /** * This is a hash map which can be snapshotted. *
- * See {@SnapshottableHashTable} for more details about the implementation. + * See {@link SnapshottableHashTable} for more details about the implementation. *
* This class requires external synchronization. Null keys and values are not supported. * @@ -127,9 +127,7 @@ public class TimelineHashMap @Override public boolean containsValue(Object value) { - Iterator> iter = entrySet().iterator(); - while (iter.hasNext()) { - Entry e = iter.next(); + for (Entry e : entrySet()) { if (value.equals(e.getValue())) { return true; } @@ -378,9 +376,8 @@ public class TimelineHashMap @Override public int hashCode() { int hash = 0; - Iterator> iter = entrySet().iterator(); - while (iter.hasNext()) { - hash += iter.next().hashCode(); + for (Entry kvEntry : entrySet()) { + hash += kvEntry.hashCode(); } return hash; } @@ -395,9 +392,7 @@ public class TimelineHashMap if (m.size() != size()) return false; try { - Iterator> iter = entrySet().iterator(); - while (iter.hasNext()) { - Entry entry = iter.next(); + for (Entry entry : entrySet()) { if (!m.get(entry.getKey()).equals(entry.getValue())) { return false; } diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java index 24705a4dffc..f4eab55ba3e 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java @@ -25,7 +25,7 @@ import java.util.Set; /** * This is a hash set which can be snapshotted. *
- * See {@SnapshottableHashTable} for more details about the implementation. + * See {@link SnapshottableHashTable} for more details about the implementation. *
* This class requires external synchronization. Null values are not supported. * @@ -231,9 +231,8 @@ public class TimelineHashSet @Override public int hashCode() { int hash = 0; - Iterator iter = iterator(); - while (iter.hasNext()) { - hash += iter.next().hashCode(); + for (T t : this) { + hash += t.hashCode(); } return hash; } diff --git a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java index 7c4f0e62a95..525068cd794 100644 --- a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java +++ b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java @@ -44,10 +44,6 @@ public class DeferredEventQueueTest { future.complete(null); } } - - CompletableFuture future() { - return future; - } } @Test diff --git a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java index 6ac0816633c..1fdb9f40e68 100644 --- a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java +++ b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java @@ -54,7 +54,7 @@ public class KafkaEventQueueTest { } @Override - public void run() throws Exception { + public void run() { T value = supplier.get(); future.complete(value); } @@ -194,9 +194,9 @@ public class KafkaEventQueueTest { "testDeferredIsQueuedAfterTriggering"); AtomicInteger count = new AtomicInteger(0); List> futures = Arrays.asList( - new CompletableFuture(), - new CompletableFuture(), - new CompletableFuture()); + new CompletableFuture<>(), + new CompletableFuture<>(), + new CompletableFuture<>()); queue.scheduleDeferred("foo", __ -> OptionalLong.of(2L), new FutureEvent<>(futures.get(0), () -> count.getAndIncrement())); queue.append(new FutureEvent<>(futures.get(1), () -> count.getAndAdd(1))); @@ -231,7 +231,7 @@ public class KafkaEventQueueTest { CompletableFuture future = new CompletableFuture<>(); queue.append(new EventQueue.Event() { @Override - public void run() throws Exception { + public void run() { future.complete(null); } @@ -284,7 +284,7 @@ public class KafkaEventQueueTest { AtomicInteger counter = new AtomicInteger(0); queue.append(new EventQueue.Event() { @Override - public void run() throws Exception { + public void run() { counter.incrementAndGet(); throw new IllegalStateException("First exception"); } diff --git a/server-common/src/test/java/org/apache/kafka/server/util/DeadlineTest.java b/server-common/src/test/java/org/apache/kafka/server/util/DeadlineTest.java index 7a4969f1d17..d43fd3e750c 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/DeadlineTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/DeadlineTest.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -33,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; @Timeout(value = 120) public class DeadlineTest { - private static final Logger log = LoggerFactory.getLogger(FutureUtilsTest.class); private static Time monoTime(long monotonicTime) { return new MockTime(0, 0, monotonicTime); diff --git a/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java index f7f029a5426..53bb86a8b63 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java @@ -108,11 +108,11 @@ public class FutureUtilsTest { assertFalse(sourceFuture.isCompletedExceptionally()); assertFalse(destinationFuture.isCompletedExceptionally()); sourceFuture.complete(123); - assertEquals(Integer.valueOf(123), destinationFuture.get()); + assertEquals(123, destinationFuture.get()); } @Test - public void testChainFutureExceptionally() throws Throwable { + public void testChainFutureExceptionally() { CompletableFuture sourceFuture = new CompletableFuture<>(); CompletableFuture destinationFuture = new CompletableFuture<>(); FutureUtils.chainFuture(sourceFuture, destinationFuture); diff --git a/server-common/src/test/java/org/apache/kafka/server/util/JsonTest.java b/server-common/src/test/java/org/apache/kafka/server/util/JsonTest.java index d4c25070768..8e7099655f4 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/JsonTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/JsonTest.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -121,8 +122,7 @@ public class JsonTest { List results = new ArrayList<>(); parse(JSON).asJsonObject().apply("array").asJsonArray().iterator().forEachRemaining(results::add); - List expected = Arrays.asList("4.0", "11.1", "44.5") - .stream() + List expected = Stream.of("4.0", "11.1", "44.5") .map(this::parse) .collect(Collectors.toList()); assertEquals(expected, results); diff --git a/server-common/src/test/java/org/apache/kafka/server/util/MockScheduler.java b/server-common/src/test/java/org/apache/kafka/server/util/MockScheduler.java index be011eecb60..eedfe606bc2 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/MockScheduler.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/MockScheduler.java @@ -22,10 +22,8 @@ import java.util.Comparator; import java.util.Optional; import java.util.PriorityQueue; import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; @@ -105,12 +103,12 @@ public class MockScheduler implements Scheduler { } @Override - public Void get() throws InterruptedException, ExecutionException { + public Void get() { return null; } @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + public Void get(long timeout, TimeUnit unit) { return null; } diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java index b14ff0d2312..d2074cc8611 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java @@ -48,16 +48,13 @@ public class SystemTimerReaperTest { @Test public void testReaper() throws Exception { - Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer")); - try { + try (Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer"))) { CompletableFuture t1 = add(timer, 100L); CompletableFuture t2 = add(timer, 200L); CompletableFuture t3 = add(timer, 300L); TestUtils.assertFutureThrows(t1, TimeoutException.class); TestUtils.assertFutureThrows(t2, TimeoutException.class); TestUtils.assertFutureThrows(t3, TimeoutException.class); - } finally { - timer.close(); } } diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTaskListTest.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTaskListTest.java index 7d1cd4c4876..3ac4b1fcfbd 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTaskListTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTaskListTest.java @@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class TimerTaskListTest { - private class TestTask extends TimerTask { + private static class TestTask extends TimerTask { TestTask(long delayMs) { super(delayMs); } diff --git a/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java index 1da32d4b3d9..54063db69fa 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java @@ -33,7 +33,7 @@ public class BaseHashTableTest { public void testEmptyTable() { BaseHashTable table = new BaseHashTable<>(0); assertEquals(0, table.baseSize()); - assertNull(table.baseGet(Integer.valueOf(1))); + assertNull(table.baseGet(1)); } @Test @@ -54,10 +54,10 @@ public class BaseHashTableTest { @Test public void testInsertAndRemove() { BaseHashTable table = new BaseHashTable<>(20); - Integer one = Integer.valueOf(1); - Integer two = Integer.valueOf(2); - Integer three = Integer.valueOf(3); - Integer four = Integer.valueOf(4); + Integer one = 1; + Integer two = 2; + Integer three = 3; + Integer four = 4; assertNull(table.baseAddOrReplace(one)); assertNull(table.baseAddOrReplace(two)); assertNull(table.baseAddOrReplace(three)); @@ -114,12 +114,12 @@ public class BaseHashTableTest { for (int i = 0; i < 4096; i++) { assertEquals(i, table.baseSize()); - assertNull(table.baseAddOrReplace(Integer.valueOf(i))); + assertNull(table.baseAddOrReplace(i)); } for (int i = 0; i < 4096; i++) { assertEquals(4096 - i, table.baseSize()); - assertEquals(Integer.valueOf(i), table.baseRemove(Integer.valueOf(i))); + assertEquals(Integer.valueOf(i), table.baseRemove(i)); } } diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java index aa1cf243bd7..ef322d9ea8f 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -40,10 +41,7 @@ public class SnapshotRegistryTest { private static void assertIteratorContains(Iterator iter, Snapshot... snapshots) { - List expected = new ArrayList<>(); - for (Snapshot snapshot : snapshots) { - expected.add(snapshot); - } + List expected = Arrays.asList(snapshots); List actual = new ArrayList<>(); while (iter.hasNext()) { Snapshot snapshot = iter.next(); diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java index 2d4a4cf9f04..c330d93d4e8 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java @@ -301,7 +301,6 @@ public class SnapshottableHashTableTest { remaining.put(object, true); } List extraObjects = new ArrayList<>(); - int i = 0; while (iter.hasNext()) { Object object = iter.next(); assertNotNull(object); @@ -310,10 +309,8 @@ public class SnapshottableHashTableTest { } } if (!extraObjects.isEmpty() || !remaining.isEmpty()) { - throw new RuntimeException("Found extra object(s): [" + String.join(", ", - extraObjects.stream().map(e -> e.toString()).collect(Collectors.toList())) + - "] and didn't find object(s): [" + String.join(", ", - remaining.keySet().stream().map(e -> e.toString()).collect(Collectors.toList())) + "]"); + throw new RuntimeException("Found extra object(s): [" + extraObjects.stream().map(Object::toString).collect(Collectors.joining(", ")) + + "] and didn't find object(s): [" + remaining.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "]"); } } } diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 832c0068053..bcdfd13cf72 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -164,7 +164,7 @@ public class AssignmentsManager { */ private class ShutdownEvent extends Event { @Override - public void run() throws Exception { + public void run() { channelManager.shutdown(); } } @@ -203,7 +203,7 @@ public class AssignmentsManager { } } @Override - public void run() throws Exception { + public void run() { log.trace("Received assignment {}", this); AssignmentEvent existing = pending.getOrDefault(partition, null); boolean existingIsInFlight = false; @@ -269,7 +269,7 @@ public class AssignmentsManager { private class DispatchEvent extends Event { static final String TAG = "dispatch"; @Override - public void run() throws Exception { + public void run() { if (inflight != null) { throw new IllegalStateException("Bug. Should not be dispatching while there are assignments in flight"); } @@ -310,7 +310,7 @@ public class AssignmentsManager { this.response = response; } @Override - public void run() throws Exception { + public void run() { if (inflight == null) { throw new IllegalStateException("Bug. Cannot not be handling a client response if there is are no assignments in flight"); } @@ -321,7 +321,7 @@ public class AssignmentsManager { AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data(); Set failed = filterFailures(data, inflight); - Set completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed); + Set completed = Utils.diff(HashSet::new, new HashSet<>(inflight.values()), failed); for (AssignmentEvent assignmentEvent : completed) { if (log.isDebugEnabled()) { log.debug("Successfully propagated assignment {}", assignmentEvent); diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java index efd6e8e65cc..f9dca830057 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java @@ -116,8 +116,7 @@ public class ClientMetricsInstance { */ boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp; if (!canAccept) { - long lastRequestTimestamp = Math.max(lastGetRequestTimestamp, lastPushRequestTimestamp); - long timeElapsedSinceLastMsg = currentTime - lastRequestTimestamp; + long timeElapsedSinceLastMsg = currentTime - lastPushRequestTimestamp; canAccept = timeElapsedSinceLastMsg >= pushIntervalMs; } diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java index 1b312d80472..93f7a39cab4 100644 --- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java @@ -321,10 +321,9 @@ public class AssignmentsManagerTest { } } AssignReplicasToDirsResponseData responseData = AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors); - ClientResponse response = new ClientResponse(null, null, null, + return new ClientResponse(null, null, null, 0L, 0L, false, false, null, null, new AssignReplicasToDirsResponse(responseData)); - return response; } @Test