KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata (#13537)

This patch adds ClientAssignor, Assignment, TopicMetadata and VersionedMetadata classes.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-04-21 11:22:16 +02:00 committed by GitHub
parent 2d0b816150
commit c39bf714bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 985 additions and 0 deletions

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* An immutable assignment for a member.
*/
public class Assignment {
public static final Assignment EMPTY = new Assignment(
(byte) 0,
Collections.emptyMap(),
VersionedMetadata.EMPTY
);
/**
* The error assigned to the member.
*/
private final byte error;
/**
* The partitions assigned to the member.
*/
private final Map<Uuid, Set<Integer>> partitions;
/**
* The metadata assigned to the member.
*/
private final VersionedMetadata metadata;
public Assignment(
Map<Uuid, Set<Integer>> partitions
) {
this(
(byte) 0,
partitions,
VersionedMetadata.EMPTY
);
}
public Assignment(
byte error,
Map<Uuid, Set<Integer>> partitions,
VersionedMetadata metadata
) {
this.error = error;
this.partitions = Collections.unmodifiableMap(Objects.requireNonNull(partitions));
this.metadata = Objects.requireNonNull(metadata);
}
/**
* @return The error.
*/
public byte error() {
return error;
}
/**
* @return The assigned partitions.
*/
public Map<Uuid, Set<Integer>> partitions() {
return partitions;
}
/**
* @return The metadata.
*/
public VersionedMetadata metadata() {
return metadata;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Assignment that = (Assignment) o;
if (error != that.error) return false;
if (!partitions.equals(that.partitions)) return false;
return metadata.equals(that.metadata);
}
@Override
public int hashCode() {
int result = error;
result = 31 * result + partitions.hashCode();
result = 31 * result + metadata.hashCode();
return result;
}
@Override
public String toString() {
return "Assignment(" +
"error=" + error +
", partitions=" + partitions +
", metadata=" + metadata +
')';
}
/**
* Creates a {{@link Assignment}} from a {{@link ConsumerGroupTargetAssignmentMemberValue}}.
*
* @param record The record.
* @return A {{@link Assignment}}.
*/
public static Assignment fromRecord(
ConsumerGroupTargetAssignmentMemberValue record
) {
return new Assignment(
record.error(),
record.topicPartitions().stream().collect(Collectors.toMap(
ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId,
topicPartitions -> new HashSet<>(topicPartitions.partitions()))),
new VersionedMetadata(
record.metadataVersion(),
ByteBuffer.wrap(record.metadataBytes()))
);
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* An immutable representation of a client side assignor within a consumer group member.
*/
public class ClientAssignor {
/**
* The name of the assignor.
*/
private final String name;
/**
* The reason reported by the assignor.
*/
private final byte reason;
/**
* The minimum metadata version supported by the assignor.
*/
private final short minimumVersion;
/**
* The maximum metadata version supported by the assignor.
*/
private final short maximumVersion;
/**
* The versioned metadata.
*/
private final VersionedMetadata metadata;
public ClientAssignor(
String name,
byte reason,
short minimumVersion,
short maximumVersion,
VersionedMetadata metadata
) {
this.name = Objects.requireNonNull(name);
if (name.isEmpty()) {
throw new IllegalArgumentException("Assignor name cannot be empty.");
}
this.reason = reason;
this.minimumVersion = minimumVersion;
if (minimumVersion < -1) {
// -1 is supported as part of the upgrade from the old protocol to the new protocol. It
// basically means that the assignor supports metadata from the old client assignor.
throw new IllegalArgumentException("Assignor minimum version must be greater than -1.");
}
this.maximumVersion = maximumVersion;
if (maximumVersion < 0) {
throw new IllegalArgumentException("Assignor maximum version must be greater than or equals to 0.");
} else if (maximumVersion < minimumVersion) {
throw new IllegalArgumentException("Assignor maximum version must be greater than or equals to "
+ "the minimum version.");
}
this.metadata = Objects.requireNonNull(metadata);
}
/**
* @return The client side assignor name.
*/
public String name() {
return this.name;
}
/**
* @return The current reason reported by the assignor.
*/
public byte reason() {
return this.reason;
}
/**
* @return The minimum version supported by the assignor.
*/
public short minimumVersion() {
return this.minimumVersion;
}
/**
* @return The maximum version supported by the assignor.
*/
public short maximumVersion() {
return this.maximumVersion;
}
/**
* @return The versioned metadata.
*/
public VersionedMetadata metadata() {
return metadata;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClientAssignor that = (ClientAssignor) o;
if (reason != that.reason) return false;
if (minimumVersion != that.minimumVersion) return false;
if (maximumVersion != that.maximumVersion) return false;
if (!name.equals(that.name)) return false;
return metadata.equals(that.metadata);
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + (int) reason;
result = 31 * result + (int) minimumVersion;
result = 31 * result + (int) maximumVersion;
result = 31 * result + metadata.hashCode();
return result;
}
@Override
public String toString() {
return "ClientAssignor(name=" + name +
", reason=" + reason +
", minimumVersion=" + minimumVersion +
", maximumVersion=" + maximumVersion +
", metadata=" + metadata +
')';
}
public static ClientAssignor fromRecord(
ConsumerGroupMemberMetadataValue.Assignor record
) {
return new ClientAssignor(
record.name(),
record.reason(),
record.minimumVersion(),
record.maximumVersion(),
new VersionedMetadata(
record.version(),
ByteBuffer.wrap(record.metadata())
)
);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import java.util.Objects;
/**
* Immutable topic metadata.
*/
public class TopicMetadata {
/**
* The topic id.
*/
private final Uuid id;
/**
* The topic name.
*/
private final String name;
/**
* The number of partitions.
*/
private final int numPartitions;
public TopicMetadata(
Uuid id,
String name,
int numPartitions
) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
throw new IllegalArgumentException("Topic id cannot be ZERO_UUID.");
}
this.name = Objects.requireNonNull(name);
if (name.isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be empty.");
}
this.numPartitions = numPartitions;
if (numPartitions < 0) {
throw new IllegalArgumentException("Number of partitions cannot be negative.");
}
}
/**
* @return The topic id.
*/
public Uuid id() {
return this.id;
}
/**
* @return The topic name.
*/
public String name() {
return this.name;
}
/**
* @return The number of partitions.
*/
public int numPartitions() {
return this.numPartitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicMetadata that = (TopicMetadata) o;
if (!id.equals(that.id)) return false;
if (!name.equals(that.name)) return false;
return numPartitions == that.numPartitions;
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + numPartitions;
return result;
}
@Override
public String toString() {
return "TopicMetadata(" +
"id=" + id +
", name=" + name +
", numPartitions=" + numPartitions +
')';
}
public static TopicMetadata fromRecord(
ConsumerGroupPartitionMetadataValue.TopicMetadata record
) {
return new TopicMetadata(
record.topicId(),
record.topicName(),
record.numPartitions()
);
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* Immutable versioned metadata. It contains a bunch of bytes tagged with a version. The
* format of the bytes is unspecified. This is mainly used by client side assignors to
* exchange arbitrary metadata between the members and the assignor and vice versa.
*/
public class VersionedMetadata {
public static final VersionedMetadata EMPTY = new VersionedMetadata((short) 0, ByteBuffer.allocate(0));
/**
* The version of the metadata encoded in {{@link VersionedMetadata#metadata}}.
*/
private final short version;
/**
* The metadata bytes.
*/
private final ByteBuffer metadata;
public VersionedMetadata(
short version,
ByteBuffer metadata
) {
this.version = version;
this.metadata = Objects.requireNonNull(metadata);
}
/**
* @return The version of the metadata.
*/
public short version() {
return this.version;
}
/**
* @return The ByteBuffer holding the metadata.
*/
public ByteBuffer metadata() {
return this.metadata;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
VersionedMetadata that = (VersionedMetadata) o;
if (version != that.version) return false;
return metadata.equals(that.metadata);
}
@Override
public int hashCode() {
int result = version;
result = 31 * result + metadata.hashCode();
return result;
}
@Override
public String toString() {
return "VersionedMetadata(" +
"version=" + version +
", metadata=" + metadata +
')';
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AssignmentTest {
@Test
public void testPartitionsAndMetadataCannotBeNull() {
assertThrows(NullPointerException.class, () -> new Assignment(
(byte) 1,
null,
new VersionedMetadata(
(short) 1,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
)
));
assertThrows(NullPointerException.class, () -> new Assignment(
(byte) 1,
mkAssignment(
mkTopicAssignment(Uuid.randomUuid(), 1, 2, 3)
),
null
));
}
@Test
public void testAttributes() {
Map<Uuid, Set<Integer>> partitions = mkAssignment(
mkTopicAssignment(Uuid.randomUuid(), 1, 2, 3)
);
VersionedMetadata metadata = new VersionedMetadata(
(short) 1,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
);
Assignment assignment = new Assignment(
(byte) 1,
partitions,
metadata
);
assertEquals((byte) 1, assignment.error());
assertEquals(partitions, assignment.partitions());
assertEquals(metadata, assignment.metadata());
}
@Test
public void testFromTargetAssignmentRecord() {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
List<ConsumerGroupTargetAssignmentMemberValue.TopicPartition> partitions = new ArrayList<>();
partitions.add(new ConsumerGroupTargetAssignmentMemberValue.TopicPartition()
.setTopicId(topicId1)
.setPartitions(Arrays.asList(1, 2, 3)));
partitions.add(new ConsumerGroupTargetAssignmentMemberValue.TopicPartition()
.setTopicId(topicId2)
.setPartitions(Arrays.asList(4, 5, 6)));
ConsumerGroupTargetAssignmentMemberValue record = new ConsumerGroupTargetAssignmentMemberValue()
.setError((byte) 1)
.setTopicPartitions(partitions)
.setMetadataVersion((short) 2)
.setMetadataBytes("foo".getBytes(StandardCharsets.UTF_8));
Assignment assignment = Assignment.fromRecord(record);
assertEquals((short) 1, assignment.error());
assertEquals(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)
), assignment.partitions());
assertEquals(new VersionedMetadata(
(short) 2,
ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))
), assignment.metadata());
}
@Test
public void testEquals() {
Map<Uuid, Set<Integer>> partitions = mkAssignment(
mkTopicAssignment(Uuid.randomUuid(), 1, 2, 3)
);
VersionedMetadata metadata = new VersionedMetadata(
(short) 1,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
);
Assignment assignment = new Assignment(
(byte) 1,
partitions,
metadata
);
assertEquals(new Assignment(
(byte) 1,
partitions,
metadata
), assignment);
assertNotEquals(new Assignment(
(byte) 1,
Collections.emptyMap(),
metadata
), assignment);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class AssignmentTestUtil {
public static Map.Entry<Uuid, Set<Integer>> mkTopicAssignment(
Uuid topicId,
Integer... partitions
) {
return new AbstractMap.SimpleEntry<>(
topicId,
new HashSet<>(Arrays.asList(partitions))
);
}
public static Map.Entry<Uuid, Set<Integer>> mkSortedTopicAssignment(
Uuid topicId,
Integer... partitions
) {
return new AbstractMap.SimpleEntry<>(
topicId,
new TreeSet<>(Arrays.asList(partitions))
);
}
@SafeVarargs
public static Map<Uuid, Set<Integer>> mkAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
Map<Uuid, Set<Integer>> assignment = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue());
}
return assignment;
}
@SafeVarargs
public static Map<Uuid, Set<Integer>> mkSortedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
Map<Uuid, Set<Integer>> assignment = new LinkedHashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue());
}
return assignment;
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ClientAssignorTest {
@Test
public void testNameAndMetadataCannotBeNull() {
assertThrows(NullPointerException.class, () -> new ClientAssignor(
"range",
(byte) 2,
(short) 5,
(short) 10,
null
));
assertThrows(NullPointerException.class, () -> new ClientAssignor(
null,
(byte) 2,
(short) 5,
(short) 10,
new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
)
));
}
@Test
public void testAttributes() {
ClientAssignor clientAssignor = new ClientAssignor(
"range",
(byte) 2,
(short) 5,
(short) 10,
new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
)
);
assertEquals("range", clientAssignor.name());
assertEquals((byte) 2, clientAssignor.reason());
assertEquals((short) 5, clientAssignor.minimumVersion());
assertEquals((short) 10, clientAssignor.maximumVersion());
assertEquals(new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
), clientAssignor.metadata());
}
@Test
public void testFromRecord() {
ConsumerGroupMemberMetadataValue.Assignor record = new ConsumerGroupMemberMetadataValue.Assignor()
.setName("range")
.setReason((byte) 2)
.setMinimumVersion((byte) 5)
.setMaximumVersion((byte) 10)
.setVersion((byte) 8)
.setMetadata("hello".getBytes(StandardCharsets.UTF_8));
ClientAssignor clientAssignor = ClientAssignor.fromRecord(record);
assertEquals("range", clientAssignor.name());
assertEquals((byte) 2, clientAssignor.reason());
assertEquals((short) 5, clientAssignor.minimumVersion());
assertEquals((short) 10, clientAssignor.maximumVersion());
assertEquals(new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
), clientAssignor.metadata());
}
@Test
public void testEquals() {
ClientAssignor clientAssignor = new ClientAssignor(
"range",
(byte) 2,
(short) 5,
(short) 10,
new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
)
);
assertEquals(new ClientAssignor(
"range",
(byte) 2,
(short) 5,
(short) 10,
new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
)
), clientAssignor);
assertNotEquals(new ClientAssignor(
"uniform",
(byte) 2,
(short) 5,
(short) 10,
new VersionedMetadata(
(short) 8,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
)
), clientAssignor);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TopicMetadataTest {
@Test
public void testAttributes() {
Uuid topicId = Uuid.randomUuid();
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(topicId, topicMetadata.id());
assertEquals("foo", topicMetadata.name());
assertEquals(15, topicMetadata.numPartitions());
}
@Test
public void testTopicIdAndNameCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15));
assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15));
}
@Test
public void testEquals() {
Uuid topicId = Uuid.randomUuid();
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);
assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata);
}
@Test
public void testFromRecord() {
Uuid topicId = Uuid.randomUuid();
String topicName = "foo";
ConsumerGroupPartitionMetadataValue.TopicMetadata record = new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicId)
.setTopicName(topicName)
.setNumPartitions(15);
assertEquals(
new TopicMetadata(topicId, topicName, 15),
TopicMetadata.fromRecord(record)
);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class VersionedMetadataTest {
@Test
public void testAttributes() {
VersionedMetadata metadata = new VersionedMetadata(
(short) 1,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
);
assertEquals((short) 1, metadata.version());
assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), metadata.metadata());
}
@Test
public void testMetadataCannotBeNull() {
assertThrows(NullPointerException.class, () -> new VersionedMetadata((short) 1, null));
}
@Test
public void testEquals() {
VersionedMetadata metadata = new VersionedMetadata(
(short) 1,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
);
assertEquals(new VersionedMetadata(
(short) 1,
ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
), metadata);
assertNotEquals(VersionedMetadata.EMPTY, metadata);
}
}