KAFKA-17747: [2/N] Add compute topic and group hash (#19523)

* Add `com.dynatrace.hash4j:hash4j:0.22.0` to dependencies.
* Add `computeTopicHash` to `org.apache.kafka.coordinator.group.Utils`.
  * If topic name is non-existent, return 0.
  * If topic name is existent, use streaming XXH3 to compute topic hash
with magic byte, topic id, topic name, number of partitions, partition
id and sorted racks.
* Add `computeGroupHash` to `org.apache.kafka.coordinator.group.Utils`.
  * If topic map is empty, return 0.
  * If topic map is not empty, use streaming XXH3 to compute group
metadata hash with sorted topic hashes by topic names.
* Add related unit test.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-05-15 03:48:45 -05:00 committed by GitHub
parent ecb5b6bd7e
commit a1008dc85d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 345 additions and 2 deletions

View File

@ -212,6 +212,7 @@ License Version 2.0:
- commons-lang3-3.12.0 - commons-lang3-3.12.0
- commons-logging-1.3.2 - commons-logging-1.3.2
- commons-validator-1.9.0 - commons-validator-1.9.0
- hash4j-0.22.0
- jackson-annotations-2.16.2 - jackson-annotations-2.16.2
- jackson-core-2.16.2 - jackson-core-2.16.2
- jackson-databind-2.16.2 - jackson-databind-2.16.2

View File

@ -1418,6 +1418,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram implementation libs.hdrHistogram
implementation libs.re2j implementation libs.re2j
implementation libs.slf4jApi implementation libs.slf4jApi
implementation libs.hash4j
testImplementation project(':clients').sourceSets.test.output testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output

View File

@ -51,6 +51,7 @@
<subpackage name="coordinator"> <subpackage name="coordinator">
<subpackage name="group"> <subpackage name="group">
<allow pkg="net.jpountz.xxhash" />
<allow pkg="org.apache.kafka.clients.consumer" /> <allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" /> <allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" /> <allow pkg="org.apache.kafka.common.config" />
@ -76,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common" /> <allow pkg="org.apache.kafka.coordinator.common" />
<allow pkg="org.apache.kafka.coordinator.common.runtime" /> <allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" /> <allow pkg="com.google.re2j" />
<allow pkg="com.dynatrace.hash4j.hashing" />
<allow pkg="org.apache.kafka.metadata" /> <allow pkg="org.apache.kafka.metadata" />
<subpackage name="metrics"> <subpackage name="metrics">
<allow pkg="com.yammer.metrics"/> <allow pkg="com.yammer.metrics"/>

View File

@ -127,7 +127,8 @@ versions += [
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid // Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-10", zstd: "1.5.6-10",
junitPlatform: "1.10.2", junitPlatform: "1.10.2",
hdrHistogram: "2.2.2" hdrHistogram: "2.2.2",
hash4j: "0.22.0"
] ]
libs += [ libs += [
@ -225,5 +226,6 @@ libs += [
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
zstd: "com.github.luben:zstd-jni:$versions.zstd", zstd: "com.github.luben:zstd-jni:$versions.zstd",
httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient", httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient",
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram" hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
hash4j: "com.dynatrace.hash4j:hash4j:$versions.hash4j",
] ]

View File

@ -25,10 +25,15 @@ import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage; import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import com.dynatrace.hash4j.hashing.HashStream64;
import com.dynatrace.hash4j.hashing.Hashing;
import com.google.re2j.Pattern; import com.google.re2j.Pattern;
import com.google.re2j.PatternSyntaxException; import com.google.re2j.PatternSyntaxException;
@ -324,4 +329,99 @@ public class Utils {
regex, ex.getDescription())); regex, ex.getDescription()));
} }
} }
/**
* The magic byte used to identify the version of topic hash function.
*/
static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
/**
* Computes the hash of the topics in a group.
* <p>
* The computed hash value is stored as the metadata hash in the *GroupMetadataValue.
* <p>
* If there is no topic, the hash value is set to 0.
* The hashing process involves the following steps:
* 1. Sort the topic hashes by topic name.
* 2. Write each topic hash in order.
*
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
if (topicHashes.isEmpty()) {
return 0;
}
// Sort entries by topic name
List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>(topicHashes.entrySet());
sortedEntries.sort(Map.Entry.comparingByKey());
HashStream64 hasher = Hashing.xxh3_64().hashStream();
for (Map.Entry<String, Long> entry : sortedEntries) {
hasher.putLong(entry.getValue());
}
return hasher.getAsLong();
}
/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by streaming XXH3.
* <p>
* The computed hash value for the topic is utilized in conjunction with the {@link #computeGroupHash(Map)}
* method and is stored as part of the metadata hash in the *GroupMetadataValue.
* It is important to note that if the hash algorithm is changed, the magic byte must be updated to reflect the
* new hash version.
* <p>
* For non-existent topics, the hash value is set to 0.
* For existent topics, the hashing process involves the following steps:
* 1. Write a magic byte to denote the version of the hash function.
* 2. Write the hash code of the topic ID with mostSignificantBits and leastSignificantBits.
* 3. Write the topic name.
* 4. Write the number of partitions associated with the topic.
* 5. For each partition, write the partition ID and a sorted list of rack identifiers.
* - Rack identifiers are formatted as "<length1><value1><length2><value2>" to prevent issues with simple separators.
*
* @param topicName The topic image.
* @param metadataImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(String topicName, MetadataImage metadataImage) {
TopicImage topicImage = metadataImage.topics().getTopic(topicName);
if (topicImage == null) {
return 0;
}
HashStream64 hasher = Hashing.xxh3_64().hashStream();
hasher = hasher
.putByte(TOPIC_HASH_MAGIC_BYTE)
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits())
.putString(topicImage.name())
.putInt(topicImage.partitions().size());
ClusterImage clusterImage = metadataImage.cluster();
List<String> racks = new ArrayList<>();
for (int i = 0; i < topicImage.partitions().size(); i++) {
hasher = hasher.putInt(i);
racks.clear(); // Clear the list for reuse
for (int replicaId : topicImage.partitions().get(i).replicas) {
BrokerRegistration broker = clusterImage.broker(replicaId);
if (broker != null) {
broker.rack().ifPresent(racks::add);
}
}
Collections.sort(racks);
for (String rack : racks) {
// Format: "<length><value>"
// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character.
// If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,".
// Add length before the rack string to avoid the edge case.
hasher = hasher.putInt(rack.length()).putString(rack);
}
}
return hasher.getAsLong();
}
} }

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataImage;
import com.dynatrace.hash4j.hashing.Hashing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class UtilsTest {
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
private static final String FOO_TOPIC_NAME = "foo";
private static final String BAR_TOPIC_NAME = "bar";
private static final int FOO_NUM_PARTITIONS = 2;
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();
@Test
void testNonExistingTopicName() {
assertEquals(0, Utils.computeTopicHash("unknown", FOO_METADATA_IMAGE));
}
@Test
void testComputeTopicHash() {
long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE);
long expected = Hashing.xxh3_64().hashStream()
.putByte((byte) 0)
.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())
.putString(FOO_TOPIC_NAME)
.putInt(FOO_NUM_PARTITIONS)
.putInt(0) // partition 0
.putInt(5) // length of rack0
.putString("rack0") // The first rack in partition 0
.putInt(5) // length of rack1
.putString("rack1") // The second rack in partition 0
.putInt(1) // partition 1
.putInt(5) // length of rack0
.putString("rack1") // The first rack in partition 1
.putInt(5) // length of rack1
.putString("rack2") // The second rack in partition 1
.getAsLong();
assertEquals(expected, result);
}
@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE);
long expected = Hashing.xxh3_64().hashStream()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())
.putString(FOO_TOPIC_NAME)
.putInt(FOO_NUM_PARTITIONS)
.putInt(0) // partition 0
.putInt(5) // length of rack0
.putString("rack0") // The first rack in partition 0
.putInt(5) // length of rack1
.putString("rack1") // The second rack in partition 0
.putInt(1) // partition 1
.putInt(5) // length of rack0
.putString("rack1") // The first rack in partition 1
.putInt(5) // length of rack1
.putString("rack2") // The second rack in partition 1
.getAsLong();
assertNotEquals(expected, result);
}
@Test
void testComputeTopicHashWithLeastSignificantBitsFirst() {
long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE);
long expected = Hashing.xxh3_64().hashStream()
.putByte((byte) 0)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // different order
.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putString(FOO_TOPIC_NAME)
.putInt(FOO_NUM_PARTITIONS)
.putInt(0) // partition 0
.putInt(5) // length of rack0
.putString("rack0") // The first rack in partition 0
.putInt(5) // length of rack1
.putString("rack1") // The second rack in partition 0
.putInt(1) // partition 1
.putInt(5) // length of rack0
.putString("rack1") // The first rack in partition 1
.putInt(5) // length of rack1
.putString("rack2") // The second rack in partition 1
.getAsLong();
assertNotEquals(expected, result);
}
@Test
void testComputeTopicHashWithDifferentPartitionOrder() {
long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE);
long expected = Hashing.xxh3_64().hashStream()
.putByte((byte) 1)
.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())
.putString(FOO_TOPIC_NAME)
.putInt(FOO_NUM_PARTITIONS)
.putInt(1) // partition 1
.putInt(5) // length of rack0
.putString("rack1") // The first rack in partition 1
.putInt(5) // length of rack1
.putString("rack2") // The second rack in partition 1
.putInt(0) // partition 0
.putInt(5) // length of rack0
.putString("rack0") // The first rack in partition 0
.putInt(5) // length of rack1
.putString("rack1") // The second rack in partition 0
.getAsLong();
assertNotEquals(expected, result);
}
@Test
void testComputeTopicHashWithDifferentRackOrder() {
long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE);
long expected = Hashing.xxh3_64().hashStream()
.putByte((byte) 0)
.putLong(FOO_TOPIC_ID.getMostSignificantBits())
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())
.putString(FOO_TOPIC_NAME)
.putInt(FOO_NUM_PARTITIONS)
.putInt(0) // partition 0
.putInt(5) // length of rack0
.putString("rack1") // The second rack in partition 0
.putInt(5) // length of rack1
.putString("rack0") // The first rack in partition 0
.putInt(1) // partition 1
.putInt(5) // length of rack0
.putString("rack1") // The first rack in partition 1
.putInt(5) // length of rack1
.putString("rack2") // The second rack in partition 1
.getAsLong();
assertNotEquals(expected, result);
}
@ParameterizedTest
@MethodSource("differentFieldGenerator")
void testComputeTopicHashWithDifferentField(MetadataImage differentImage) {
long result = Utils.computeTopicHash(FOO_TOPIC_NAME, FOO_METADATA_IMAGE);
assertNotEquals(
Utils.computeTopicHash(FOO_TOPIC_NAME, differentImage),
result
);
}
private static Stream<Arguments> differentFieldGenerator() {
return Stream.of(
Arguments.of(
new MetadataImageBuilder() // different topic id
.addTopic(Uuid.randomUuid(), FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build()
),
Arguments.of(new MetadataImageBuilder() // different topic name
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
.addRacks()
.build()
),
Arguments.of(new MetadataImageBuilder() // different partitions
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
.addRacks()
.build()
),
Arguments.of(new MetadataImageBuilder() // different racks
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.build()
)
);
}
@Test
void testComputeGroupHashWithEmptyMap() {
assertEquals(0, Utils.computeGroupHash(Map.of()));
}
@Test
void testComputeGroupHashWithDifferentOrder() {
Map<String, Long> ascendTopicHashes = new LinkedHashMap<>();
ascendTopicHashes.put(BAR_TOPIC_NAME, 123L);
ascendTopicHashes.put(FOO_TOPIC_NAME, 456L);
Map<String, Long> descendTopicHashes = new LinkedHashMap<>();
descendTopicHashes.put(FOO_TOPIC_NAME, 456L);
descendTopicHashes.put(BAR_TOPIC_NAME, 123L);
assertEquals(Utils.computeGroupHash(ascendTopicHashes), Utils.computeGroupHash(descendTopicHashes));
}
@Test
void testComputeGroupHashWithSameKeyButDifferentValue() {
Map<String, Long> map1 = Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
);
Map<String, Long> map2 = Map.of(
BAR_TOPIC_NAME, 456L,
FOO_TOPIC_NAME, 123L
);
assertNotEquals(Utils.computeGroupHash(map1), Utils.computeGroupHash(map2));
}
}