mirror of https://github.com/apache/kafka.git
MINOR: Various cleanups in server and server-common (#15710)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
395fdae5f2
commit
2b9729ba77
|
@ -102,11 +102,7 @@ public class DeferredEventQueue {
|
|||
offset + " which is lower than that.");
|
||||
}
|
||||
}
|
||||
List<DeferredEvent> events = pending.get(offset);
|
||||
if (events == null) {
|
||||
events = new ArrayList<>();
|
||||
pending.put(offset, events);
|
||||
}
|
||||
List<DeferredEvent> events = pending.computeIfAbsent(offset, k -> new ArrayList<>());
|
||||
events.add(event);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Adding deferred event {} at offset {}", event, offset);
|
||||
|
|
|
@ -117,7 +117,7 @@ public class KafkaMetricsGroup {
|
|||
|
||||
private static Optional<String> toMBeanName(Map<String, String> tags) {
|
||||
List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().equals(""))
|
||||
.filter(entry -> !entry.getValue().isEmpty())
|
||||
.collect(Collectors.toList());
|
||||
if (!filteredTags.isEmpty()) {
|
||||
String tagsString = filteredTags.stream()
|
||||
|
@ -131,7 +131,7 @@ public class KafkaMetricsGroup {
|
|||
|
||||
private static Optional<String> toScope(Map<String, String> tags) {
|
||||
List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().equals(""))
|
||||
.filter(entry -> !entry.getValue().isEmpty())
|
||||
.collect(Collectors.toList());
|
||||
if (!filteredTags.isEmpty()) {
|
||||
// convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
|
||||
|
|
|
@ -106,7 +106,7 @@ public class KafkaYammerMetrics implements Reconfigurable {
|
|||
nameBuilder.append(":type=");
|
||||
nameBuilder.append(typeName);
|
||||
|
||||
if (name.length() > 0) {
|
||||
if (!name.isEmpty()) {
|
||||
nameBuilder.append(",name=");
|
||||
nameBuilder.append(name);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ public class Csv {
|
|||
*/
|
||||
public static Map<String, String> parseCsvMap(String str) {
|
||||
Map<String, String> map = new HashMap<>();
|
||||
if (str == null || "".equals(str))
|
||||
if (str == null || str.isEmpty())
|
||||
return map;
|
||||
String[] keyVals = str.split("\\s*,\\s*");
|
||||
for (String s : keyVals) {
|
||||
|
|
|
@ -37,7 +37,7 @@ public final class Json {
|
|||
*/
|
||||
public static Optional<JsonValue> parseFull(String input) {
|
||||
try {
|
||||
return Optional.ofNullable(tryParseFull(input));
|
||||
return Optional.of(tryParseFull(input));
|
||||
} catch (JsonProcessingException e) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
|
|
@ -96,7 +96,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
*/
|
||||
public class TimingWheel {
|
||||
private final long tickMs;
|
||||
private final long startMs;
|
||||
private final int wheelSize;
|
||||
private final AtomicInteger taskCounter;
|
||||
private final DelayQueue<TimerTaskList> queue;
|
||||
|
@ -116,7 +115,6 @@ public class TimingWheel {
|
|||
DelayQueue<TimerTaskList> queue
|
||||
) {
|
||||
this.tickMs = tickMs;
|
||||
this.startMs = startMs;
|
||||
this.wheelSize = wheelSize;
|
||||
this.taskCounter = taskCounter;
|
||||
this.queue = queue;
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.Set;
|
|||
/**
|
||||
* This is a hash map which can be snapshotted.
|
||||
* <br>
|
||||
* See {@SnapshottableHashTable} for more details about the implementation.
|
||||
* See {@link SnapshottableHashTable} for more details about the implementation.
|
||||
* <br>
|
||||
* This class requires external synchronization. Null keys and values are not supported.
|
||||
*
|
||||
|
@ -127,9 +127,7 @@ public class TimelineHashMap<K, V>
|
|||
|
||||
@Override
|
||||
public boolean containsValue(Object value) {
|
||||
Iterator<Entry<K, V>> iter = entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry<K, V> e = iter.next();
|
||||
for (Entry<K, V> e : entrySet()) {
|
||||
if (value.equals(e.getValue())) {
|
||||
return true;
|
||||
}
|
||||
|
@ -378,9 +376,8 @@ public class TimelineHashMap<K, V>
|
|||
@Override
|
||||
public int hashCode() {
|
||||
int hash = 0;
|
||||
Iterator<Entry<K, V>> iter = entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
hash += iter.next().hashCode();
|
||||
for (Entry<K, V> kvEntry : entrySet()) {
|
||||
hash += kvEntry.hashCode();
|
||||
}
|
||||
return hash;
|
||||
}
|
||||
|
@ -395,9 +392,7 @@ public class TimelineHashMap<K, V>
|
|||
if (m.size() != size())
|
||||
return false;
|
||||
try {
|
||||
Iterator<Entry<K, V>> iter = entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry<K, V> entry = iter.next();
|
||||
for (Entry<K, V> entry : entrySet()) {
|
||||
if (!m.get(entry.getKey()).equals(entry.getValue())) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.Set;
|
|||
/**
|
||||
* This is a hash set which can be snapshotted.
|
||||
* <br>
|
||||
* See {@SnapshottableHashTable} for more details about the implementation.
|
||||
* See {@link SnapshottableHashTable} for more details about the implementation.
|
||||
* <br>
|
||||
* This class requires external synchronization. Null values are not supported.
|
||||
*
|
||||
|
@ -231,9 +231,8 @@ public class TimelineHashSet<T>
|
|||
@Override
|
||||
public int hashCode() {
|
||||
int hash = 0;
|
||||
Iterator<T> iter = iterator();
|
||||
while (iter.hasNext()) {
|
||||
hash += iter.next().hashCode();
|
||||
for (T t : this) {
|
||||
hash += t.hashCode();
|
||||
}
|
||||
return hash;
|
||||
}
|
||||
|
|
|
@ -44,10 +44,6 @@ public class DeferredEventQueueTest {
|
|||
future.complete(null);
|
||||
}
|
||||
}
|
||||
|
||||
CompletableFuture<Void> future() {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -54,7 +54,7 @@ public class KafkaEventQueueTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
T value = supplier.get();
|
||||
future.complete(value);
|
||||
}
|
||||
|
@ -194,9 +194,9 @@ public class KafkaEventQueueTest {
|
|||
"testDeferredIsQueuedAfterTriggering");
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
List<CompletableFuture<Integer>> futures = Arrays.asList(
|
||||
new CompletableFuture<Integer>(),
|
||||
new CompletableFuture<Integer>(),
|
||||
new CompletableFuture<Integer>());
|
||||
new CompletableFuture<>(),
|
||||
new CompletableFuture<>(),
|
||||
new CompletableFuture<>());
|
||||
queue.scheduleDeferred("foo", __ -> OptionalLong.of(2L),
|
||||
new FutureEvent<>(futures.get(0), () -> count.getAndIncrement()));
|
||||
queue.append(new FutureEvent<>(futures.get(1), () -> count.getAndAdd(1)));
|
||||
|
@ -231,7 +231,7 @@ public class KafkaEventQueueTest {
|
|||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
queue.append(new EventQueue.Event() {
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
future.complete(null);
|
||||
}
|
||||
|
||||
|
@ -284,7 +284,7 @@ public class KafkaEventQueueTest {
|
|||
AtomicInteger counter = new AtomicInteger(0);
|
||||
queue.append(new EventQueue.Event() {
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
counter.incrementAndGet();
|
||||
throw new IllegalStateException("First exception");
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.MockTime;
|
|||
import org.apache.kafka.common.utils.Time;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.HOURS;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
@ -33,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
|
||||
@Timeout(value = 120)
|
||||
public class DeadlineTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(FutureUtilsTest.class);
|
||||
|
||||
private static Time monoTime(long monotonicTime) {
|
||||
return new MockTime(0, 0, monotonicTime);
|
||||
|
|
|
@ -108,11 +108,11 @@ public class FutureUtilsTest {
|
|||
assertFalse(sourceFuture.isCompletedExceptionally());
|
||||
assertFalse(destinationFuture.isCompletedExceptionally());
|
||||
sourceFuture.complete(123);
|
||||
assertEquals(Integer.valueOf(123), destinationFuture.get());
|
||||
assertEquals(123, destinationFuture.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChainFutureExceptionally() throws Throwable {
|
||||
public void testChainFutureExceptionally() {
|
||||
CompletableFuture<Integer> sourceFuture = new CompletableFuture<>();
|
||||
CompletableFuture<Number> destinationFuture = new CompletableFuture<>();
|
||||
FutureUtils.chainFuture(sourceFuture, destinationFuture);
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
|
@ -121,8 +122,7 @@ public class JsonTest {
|
|||
List<JsonValue> results = new ArrayList<>();
|
||||
parse(JSON).asJsonObject().apply("array").asJsonArray().iterator().forEachRemaining(results::add);
|
||||
|
||||
List<JsonValue> expected = Arrays.asList("4.0", "11.1", "44.5")
|
||||
.stream()
|
||||
List<JsonValue> expected = Stream.of("4.0", "11.1", "44.5")
|
||||
.map(this::parse)
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(expected, results);
|
||||
|
|
|
@ -22,10 +22,8 @@ import java.util.Comparator;
|
|||
import java.util.Optional;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
@ -105,12 +103,12 @@ public class MockScheduler implements Scheduler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Void get() throws InterruptedException, ExecutionException {
|
||||
public Void get() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
public Void get(long timeout, TimeUnit unit) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,16 +48,13 @@ public class SystemTimerReaperTest {
|
|||
|
||||
@Test
|
||||
public void testReaper() throws Exception {
|
||||
Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer"));
|
||||
try {
|
||||
try (Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer"))) {
|
||||
CompletableFuture<Void> t1 = add(timer, 100L);
|
||||
CompletableFuture<Void> t2 = add(timer, 200L);
|
||||
CompletableFuture<Void> t3 = add(timer, 300L);
|
||||
TestUtils.assertFutureThrows(t1, TimeoutException.class);
|
||||
TestUtils.assertFutureThrows(t2, TimeoutException.class);
|
||||
TestUtils.assertFutureThrows(t3, TimeoutException.class);
|
||||
} finally {
|
||||
timer.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
|
||||
public class TimerTaskListTest {
|
||||
|
||||
private class TestTask extends TimerTask {
|
||||
private static class TestTask extends TimerTask {
|
||||
TestTask(long delayMs) {
|
||||
super(delayMs);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ public class BaseHashTableTest {
|
|||
public void testEmptyTable() {
|
||||
BaseHashTable<Integer> table = new BaseHashTable<>(0);
|
||||
assertEquals(0, table.baseSize());
|
||||
assertNull(table.baseGet(Integer.valueOf(1)));
|
||||
assertNull(table.baseGet(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -54,10 +54,10 @@ public class BaseHashTableTest {
|
|||
@Test
|
||||
public void testInsertAndRemove() {
|
||||
BaseHashTable<Integer> table = new BaseHashTable<>(20);
|
||||
Integer one = Integer.valueOf(1);
|
||||
Integer two = Integer.valueOf(2);
|
||||
Integer three = Integer.valueOf(3);
|
||||
Integer four = Integer.valueOf(4);
|
||||
Integer one = 1;
|
||||
Integer two = 2;
|
||||
Integer three = 3;
|
||||
Integer four = 4;
|
||||
assertNull(table.baseAddOrReplace(one));
|
||||
assertNull(table.baseAddOrReplace(two));
|
||||
assertNull(table.baseAddOrReplace(three));
|
||||
|
@ -114,12 +114,12 @@ public class BaseHashTableTest {
|
|||
|
||||
for (int i = 0; i < 4096; i++) {
|
||||
assertEquals(i, table.baseSize());
|
||||
assertNull(table.baseAddOrReplace(Integer.valueOf(i)));
|
||||
assertNull(table.baseAddOrReplace(i));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 4096; i++) {
|
||||
assertEquals(4096 - i, table.baseSize());
|
||||
assertEquals(Integer.valueOf(i), table.baseRemove(Integer.valueOf(i)));
|
||||
assertEquals(Integer.valueOf(i), table.baseRemove(i));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -40,10 +41,7 @@ public class SnapshotRegistryTest {
|
|||
|
||||
private static void assertIteratorContains(Iterator<Snapshot> iter,
|
||||
Snapshot... snapshots) {
|
||||
List<Snapshot> expected = new ArrayList<>();
|
||||
for (Snapshot snapshot : snapshots) {
|
||||
expected.add(snapshot);
|
||||
}
|
||||
List<Snapshot> expected = Arrays.asList(snapshots);
|
||||
List<Snapshot> actual = new ArrayList<>();
|
||||
while (iter.hasNext()) {
|
||||
Snapshot snapshot = iter.next();
|
||||
|
|
|
@ -301,7 +301,6 @@ public class SnapshottableHashTableTest {
|
|||
remaining.put(object, true);
|
||||
}
|
||||
List<Object> extraObjects = new ArrayList<>();
|
||||
int i = 0;
|
||||
while (iter.hasNext()) {
|
||||
Object object = iter.next();
|
||||
assertNotNull(object);
|
||||
|
@ -310,10 +309,8 @@ public class SnapshottableHashTableTest {
|
|||
}
|
||||
}
|
||||
if (!extraObjects.isEmpty() || !remaining.isEmpty()) {
|
||||
throw new RuntimeException("Found extra object(s): [" + String.join(", ",
|
||||
extraObjects.stream().map(e -> e.toString()).collect(Collectors.toList())) +
|
||||
"] and didn't find object(s): [" + String.join(", ",
|
||||
remaining.keySet().stream().map(e -> e.toString()).collect(Collectors.toList())) + "]");
|
||||
throw new RuntimeException("Found extra object(s): [" + extraObjects.stream().map(Object::toString).collect(Collectors.joining(", ")) +
|
||||
"] and didn't find object(s): [" + remaining.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,7 +164,7 @@ public class AssignmentsManager {
|
|||
*/
|
||||
private class ShutdownEvent extends Event {
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
channelManager.shutdown();
|
||||
}
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ public class AssignmentsManager {
|
|||
}
|
||||
}
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
log.trace("Received assignment {}", this);
|
||||
AssignmentEvent existing = pending.getOrDefault(partition, null);
|
||||
boolean existingIsInFlight = false;
|
||||
|
@ -269,7 +269,7 @@ public class AssignmentsManager {
|
|||
private class DispatchEvent extends Event {
|
||||
static final String TAG = "dispatch";
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
if (inflight != null) {
|
||||
throw new IllegalStateException("Bug. Should not be dispatching while there are assignments in flight");
|
||||
}
|
||||
|
@ -310,7 +310,7 @@ public class AssignmentsManager {
|
|||
this.response = response;
|
||||
}
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void run() {
|
||||
if (inflight == null) {
|
||||
throw new IllegalStateException("Bug. Cannot not be handling a client response if there is are no assignments in flight");
|
||||
}
|
||||
|
@ -321,7 +321,7 @@ public class AssignmentsManager {
|
|||
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data();
|
||||
|
||||
Set<AssignmentEvent> failed = filterFailures(data, inflight);
|
||||
Set<AssignmentEvent> completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed);
|
||||
Set<AssignmentEvent> completed = Utils.diff(HashSet::new, new HashSet<>(inflight.values()), failed);
|
||||
for (AssignmentEvent assignmentEvent : completed) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Successfully propagated assignment {}", assignmentEvent);
|
||||
|
|
|
@ -116,8 +116,7 @@ public class ClientMetricsInstance {
|
|||
*/
|
||||
boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp;
|
||||
if (!canAccept) {
|
||||
long lastRequestTimestamp = Math.max(lastGetRequestTimestamp, lastPushRequestTimestamp);
|
||||
long timeElapsedSinceLastMsg = currentTime - lastRequestTimestamp;
|
||||
long timeElapsedSinceLastMsg = currentTime - lastPushRequestTimestamp;
|
||||
canAccept = timeElapsedSinceLastMsg >= pushIntervalMs;
|
||||
}
|
||||
|
||||
|
|
|
@ -321,10 +321,9 @@ public class AssignmentsManagerTest {
|
|||
}
|
||||
}
|
||||
AssignReplicasToDirsResponseData responseData = AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors);
|
||||
ClientResponse response = new ClientResponse(null, null, null,
|
||||
return new ClientResponse(null, null, null,
|
||||
0L, 0L, false, false, null, null,
|
||||
new AssignReplicasToDirsResponse(responseData));
|
||||
return response;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue