diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java index 3e1bccc476e..ef33f5fee7d 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java @@ -20,6 +20,8 @@ package org.apache.kafka.common.utils; import java.util.AbstractCollection; import java.util.AbstractSequentialList; import java.util.AbstractSet; +import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -676,4 +678,18 @@ public class ImplicitLinkedHashCollection valuesSet() { return new ImplicitLinkedHashCollectionSetView(); } + + public void sort(Comparator comparator) { + ArrayList array = new ArrayList<>(size); + Iterator iterator = iterator(); + while (iterator.hasNext()) { + E e = iterator.next(); + iterator.remove(); + array.add(e); + } + array.sort(comparator); + for (E e : array) { + add(e); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java index c043e0845a2..3c12c989f4d 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -613,4 +614,57 @@ public class ImplicitLinkedHashCollectionTest { assertEquals(new TestElement(i, i), coll.find(elements.get(i))); } } + + static class TestElementComparator implements Comparator { + static final TestElementComparator INSTANCE = new TestElementComparator(); + + @Override + public int compare(TestElement a, TestElement b) { + if (a.key < b.key) { + return -1; + } else if (a.key > b.key) { + return 1; + } else if (a.val < b.val) { + return -1; + } else if (a.val > b.val) { + return 1; + } else { + return 0; + } + } + } + + static class ReverseTestElementComparator implements Comparator { + static final ReverseTestElementComparator INSTANCE = new ReverseTestElementComparator(); + + @Override + public int compare(TestElement a, TestElement b) { + return TestElementComparator.INSTANCE.compare(b, a); + } + } + + @Test + public void testSort() { + ImplicitLinkedHashCollection coll = new ImplicitLinkedHashCollection<>(); + coll.add(new TestElement(3, 3)); + coll.add(new TestElement(1, 1)); + coll.add(new TestElement(10, 10)); + coll.add(new TestElement(9, 9)); + coll.add(new TestElement(2, 2)); + coll.add(new TestElement(4, 4)); + coll.add(new TestElement(0, 0)); + coll.add(new TestElement(30, 30)); + coll.add(new TestElement(20, 20)); + coll.add(new TestElement(11, 11)); + coll.add(new TestElement(15, 15)); + coll.add(new TestElement(5, 5)); + + expectTraversal(coll.iterator(), 3, 1, 10, 9, 2, 4, 0, 30, 20, 11, 15, 5); + coll.sort(TestElementComparator.INSTANCE); + expectTraversal(coll.iterator(), 0, 1, 2, 3, 4, 5, 9, 10, 11, 15, 20, 30); + coll.sort(TestElementComparator.INSTANCE); + expectTraversal(coll.iterator(), 0, 1, 2, 3, 4, 5, 9, 10, 11, 15, 20, 30); + coll.sort(ReverseTestElementComparator.INSTANCE); + expectTraversal(coll.iterator(), 30, 20, 15, 11, 10, 9, 5, 4, 3, 2, 1, 0); + } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java new file mode 100644 index 00000000000..c79c7f8dc26 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.common; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.Comparator; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 6) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class ImplicitLinkedHashCollectionBenchmark { + public static class TestElement implements ImplicitLinkedHashCollection.Element { + private final String value; + private int next = ImplicitLinkedHashCollection.INVALID_INDEX; + private int prev = ImplicitLinkedHashCollection.INVALID_INDEX; + + public TestElement(String value) { + this.value = value; + } + + public String value() { + return value; + } + + @Override + public int prev() { + return this.prev; + } + + @Override + public void setPrev(int prev) { + this.prev = prev; + } + + @Override + public int next() { + return this.next; + } + + @Override + public void setNext(int next) { + this.next = next; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TestElement)) return false; + TestElement other = (TestElement) o; + return value.equals(other.value); + } + } + + public static class TestElementComparator implements Comparator { + public static final TestElementComparator INSTANCE = new TestElementComparator(); + + @Override + public int compare(TestElement a, TestElement b) { + return a.value().compareTo(b.value()); + } + } + + @Param({"10000", "100000"}) + private int size; + + private ImplicitLinkedHashCollection coll; + + @Setup(Level.Trial) + public void setup() { + coll = new ImplicitLinkedHashCollection<>(); + for (int i = 0; i < size; i++) { + coll.add(new TestElement(Uuid.randomUuid().toString())); + } + } + + /** + * Test sorting the collection entries. + */ + @Benchmark + public ImplicitLinkedHashCollection testCollectionSort() { + coll.sort(TestElementComparator.INSTANCE); + return coll; + } +}