diff --git a/LICENSE-binary b/LICENSE-binary index 7a35e39889e..09e226835e6 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -212,6 +212,7 @@ License Version 2.0: - commons-lang3-3.12.0 - commons-logging-1.3.2 - commons-validator-1.9.0 +- hash4j-0.22.0 - jackson-annotations-2.16.2 - jackson-core-2.16.2 - jackson-databind-2.16.2 diff --git a/build.gradle b/build.gradle index d2ac9aa1dc4..36ced29d0bd 100644 --- a/build.gradle +++ b/build.gradle @@ -1418,6 +1418,7 @@ project(':group-coordinator') { implementation libs.hdrHistogram implementation libs.re2j implementation libs.slf4jApi + implementation libs.hash4j testImplementation project(':clients').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 8b6a8d99f5e..1f0e91de144 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -51,6 +51,7 @@ + @@ -76,6 +77,7 @@ + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index fba1023fe48..df2e7221731 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -127,7 +127,8 @@ versions += [ // Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid zstd: "1.5.6-10", junitPlatform: "1.10.2", - hdrHistogram: "2.2.2" + hdrHistogram: "2.2.2", + hash4j: "0.22.0" ] libs += [ @@ -225,5 +226,6 @@ libs += [ mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", zstd: "com.github.luben:zstd-jni:$versions.zstd", httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient", - hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram" + hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram", + hash4j: "com.dynatrace.hash4j:hash4j:$versions.hash4j", ] diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index 1736aab9d88..f7441fce0b1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -25,10 +25,15 @@ import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; +import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.server.common.ApiMessageAndVersion; +import com.dynatrace.hash4j.hashing.HashStream64; +import com.dynatrace.hash4j.hashing.Hashing; import com.google.re2j.Pattern; import com.google.re2j.PatternSyntaxException; @@ -324,4 +329,99 @@ public class Utils { regex, ex.getDescription())); } } + + /** + * The magic byte used to identify the version of topic hash function. + */ + static final byte TOPIC_HASH_MAGIC_BYTE = 0x00; + + /** + * Computes the hash of the topics in a group. + *

+ * The computed hash value is stored as the metadata hash in the *GroupMetadataValue. + *

+ * If there is no topic, the hash value is set to 0. + * The hashing process involves the following steps: + * 1. Sort the topic hashes by topic name. + * 2. Write each topic hash in order. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ + static long computeGroupHash(Map topicHashes) { + if (topicHashes.isEmpty()) { + return 0; + } + + // Sort entries by topic name + List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); + sortedEntries.sort(Map.Entry.comparingByKey()); + + HashStream64 hasher = Hashing.xxh3_64().hashStream(); + for (Map.Entry entry : sortedEntries) { + hasher.putLong(entry.getValue()); + } + + return hasher.getAsLong(); + } + + /** + * Computes the hash of the topic id, name, number of partitions, and partition racks by streaming XXH3. + *

+ * The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)} + * method and is stored as part of the metadata hash in the *GroupMetadataValue. + * It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the + * new hash version. + *

+ * For non-existent topics, the hash value is set to 0. + * For existent topics, the hashing process involves the following steps: + * 1. Write a magic byte to denote the version of the hash function. + * 2. Write the hash code of the topic ID with mostSignificantBits and leastSignificantBits. + * 3. Write the topic name. + * 4. Write the number of partitions associated with the topic. + * 5. For each partition, write the partition ID and a sorted list of rack identifiers. + * - Rack identifiers are formatted as "" to prevent issues with simple separators. + * + * @param topicName The topic image. + * @param metadataImage The cluster image. + * @return The hash of the topic. + */ + static long computeTopicHash(String topicName, MetadataImage metadataImage) { + TopicImage topicImage = metadataImage.topics().getTopic(topicName); + if (topicImage == null) { + return 0; + } + + HashStream64 hasher = Hashing.xxh3_64().hashStream(); + hasher = hasher + .putByte(TOPIC_HASH_MAGIC_BYTE) + .putLong(topicImage.id().getMostSignificantBits()) + .putLong(topicImage.id().getLeastSignificantBits()) + .putString(topicImage.name()) + .putInt(topicImage.partitions().size()); + + ClusterImage clusterImage = metadataImage.cluster(); + List racks = new ArrayList<>(); + for (int i = 0; i < topicImage.partitions().size(); i++) { + hasher = hasher.putInt(i); + racks.clear(); // Clear the list for reuse + for (int replicaId : topicImage.partitions().get(i).replicas) { + BrokerRegistration broker = clusterImage.broker(replicaId); + if (broker != null) { + broker.rack().ifPresent(racks::add); + } + } + + Collections.sort(racks); + for (String rack : racks) { + // Format: "" + // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. + // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". + // Add length before the rack string to avoid the edge case. + hasher = hasher.putInt(rack.length()).putString(rack); + } + } + + return hasher.getAsLong(); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java new file mode 100644 index 00000000000..20766b623b5 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.MetadataImage; + +import com.dynatrace.hash4j.hashing.Hashing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { + private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); + private static final String FOO_TOPIC_NAME = "foo"; + private static final String BAR_TOPIC_NAME = "bar"; + private static final int FOO_NUM_PARTITIONS = 2; + private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(); + + @Test + void testNonExistingTopicName() { + assertEquals(0, Utils.computeTopicHash("unknown", FOO_METADATA_IMAGE)); + } + + @Test + void testComputeTopicHash() { + long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE); + + long expected = Hashing.xxh3_64().hashStream() + .putByte((byte) 0) + .putLong(FOO_TOPIC_ID.getMostSignificantBits()) + .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) + .putString(FOO_TOPIC_NAME) + .putInt(FOO_NUM_PARTITIONS) + .putInt(0) // partition 0 + .putInt(5) // length of rack0 + .putString("rack0") // The first rack in partition 0 + .putInt(5) // length of rack1 + .putString("rack1") // The second rack in partition 0 + .putInt(1) // partition 1 + .putInt(5) // length of rack0 + .putString("rack1") // The first rack in partition 1 + .putInt(5) // length of rack1 + .putString("rack2") // The second rack in partition 1 + .getAsLong(); + assertEquals(expected, result); + } + + @Test + void testComputeTopicHashWithDifferentMagicByte() { + long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE); + + long expected = Hashing.xxh3_64().hashStream() + .putByte((byte) 1) // different magic byte + .putLong(FOO_TOPIC_ID.getMostSignificantBits()) + .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) + .putString(FOO_TOPIC_NAME) + .putInt(FOO_NUM_PARTITIONS) + .putInt(0) // partition 0 + .putInt(5) // length of rack0 + .putString("rack0") // The first rack in partition 0 + .putInt(5) // length of rack1 + .putString("rack1") // The second rack in partition 0 + .putInt(1) // partition 1 + .putInt(5) // length of rack0 + .putString("rack1") // The first rack in partition 1 + .putInt(5) // length of rack1 + .putString("rack2") // The second rack in partition 1 + .getAsLong(); + assertNotEquals(expected, result); + } + + @Test + void testComputeTopicHashWithLeastSignificantBitsFirst() { + long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE); + + long expected = Hashing.xxh3_64().hashStream() + .putByte((byte) 0) + .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // different order + .putLong(FOO_TOPIC_ID.getMostSignificantBits()) + .putString(FOO_TOPIC_NAME) + .putInt(FOO_NUM_PARTITIONS) + .putInt(0) // partition 0 + .putInt(5) // length of rack0 + .putString("rack0") // The first rack in partition 0 + .putInt(5) // length of rack1 + .putString("rack1") // The second rack in partition 0 + .putInt(1) // partition 1 + .putInt(5) // length of rack0 + .putString("rack1") // The first rack in partition 1 + .putInt(5) // length of rack1 + .putString("rack2") // The second rack in partition 1 + .getAsLong(); + assertNotEquals(expected, result); + } + + @Test + void testComputeTopicHashWithDifferentPartitionOrder() { + long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE); + + long expected = Hashing.xxh3_64().hashStream() + .putByte((byte) 1) + .putLong(FOO_TOPIC_ID.getMostSignificantBits()) + .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) + .putString(FOO_TOPIC_NAME) + .putInt(FOO_NUM_PARTITIONS) + .putInt(1) // partition 1 + .putInt(5) // length of rack0 + .putString("rack1") // The first rack in partition 1 + .putInt(5) // length of rack1 + .putString("rack2") // The second rack in partition 1 + .putInt(0) // partition 0 + .putInt(5) // length of rack0 + .putString("rack0") // The first rack in partition 0 + .putInt(5) // length of rack1 + .putString("rack1") // The second rack in partition 0 + .getAsLong(); + assertNotEquals(expected, result); + } + + @Test + void testComputeTopicHashWithDifferentRackOrder() { + long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE); + + long expected = Hashing.xxh3_64().hashStream() + .putByte((byte) 0) + .putLong(FOO_TOPIC_ID.getMostSignificantBits()) + .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) + .putString(FOO_TOPIC_NAME) + .putInt(FOO_NUM_PARTITIONS) + .putInt(0) // partition 0 + .putInt(5) // length of rack0 + .putString("rack1") // The second rack in partition 0 + .putInt(5) // length of rack1 + .putString("rack0") // The first rack in partition 0 + .putInt(1) // partition 1 + .putInt(5) // length of rack0 + .putString("rack1") // The first rack in partition 1 + .putInt(5) // length of rack1 + .putString("rack2") // The second rack in partition 1 + .getAsLong(); + assertNotEquals(expected, result); + } + + @ParameterizedTest + @MethodSource("differentFieldGenerator") + void testComputeTopicHashWithDifferentField(MetadataImage differentImage) { + long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE); + + assertNotEquals( + Utils.computeTopicHash(FOO_TOPIC_NAME, differentImage), + result + ); + } + + private static Stream differentFieldGenerator() { + return Stream.of( + Arguments.of( + new MetadataImageBuilder() // different topic id + .addTopic(Uuid.randomUuid(), FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build() + ), + Arguments.of(new MetadataImageBuilder() // different topic name + .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS) + .addRacks() + .build() + ), + Arguments.of(new MetadataImageBuilder() // different partitions + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1) + .addRacks() + .build() + ), + Arguments.of(new MetadataImageBuilder() // different racks + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .build() + ) + ); + } + + @Test + void testComputeGroupHashWithEmptyMap() { + assertEquals(0, Utils.computeGroupHash(Map.of())); + } + + @Test + void testComputeGroupHashWithDifferentOrder() { + Map ascendTopicHashes = new LinkedHashMap<>(); + ascendTopicHashes.put(BAR_TOPIC_NAME, 123L); + ascendTopicHashes.put(FOO_TOPIC_NAME, 456L); + + Map descendTopicHashes = new LinkedHashMap<>(); + descendTopicHashes.put(FOO_TOPIC_NAME, 456L); + descendTopicHashes.put(BAR_TOPIC_NAME, 123L); + assertEquals(Utils.computeGroupHash(ascendTopicHashes), Utils.computeGroupHash(descendTopicHashes)); + } + + @Test + void testComputeGroupHashWithSameKeyButDifferentValue() { + Map map1 = Map.of( + BAR_TOPIC_NAME, 123L, + FOO_TOPIC_NAME, 456L + ); + + Map map2 = Map.of( + BAR_TOPIC_NAME, 456L, + FOO_TOPIC_NAME, 123L + ); + assertNotEquals(Utils.computeGroupHash(map1), Utils.computeGroupHash(map2)); + } +}