mirror of https://github.com/apache/kafka.git
KAFKA-19662: Allow resetting offset for unsubscribed topic in kafka-share-groups.sh (#20453)
The `kafka-share-groups.sh` tool checks whether a topic already has a start-offset in the share group when resetting offsets. This is not necessary. By removing the check, it is possible to set a start offset for a topic which has not yet but will be subscribed in the future, thus initialising the consumption point. There is still a small piece of outstanding work to do with resetting the offset for a non-existent group which should also create the group. A subsequent PR will be used to address that. Reviewers: Jimmy Wang <48462172+JimmyWang6@users.noreply.github.com>, Lan Ding <isDing_L@163.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
parent
1d0c5f2820
commit
37e04eca81
|
@ -418,18 +418,6 @@ public class ShareGroupCommand {
|
|||
|
||||
if (opts.options.has(opts.topicOpt)) {
|
||||
partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
|
||||
Set<String> subscribedTopics = offsetsByTopicPartitions.keySet().stream()
|
||||
.map(TopicPartition::topic)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> resetTopics = partitionsToReset.stream()
|
||||
.map(TopicPartition::topic)
|
||||
.collect(Collectors.toSet());
|
||||
if (!subscribedTopics.containsAll(resetTopics)) {
|
||||
CommandLineUtils
|
||||
.printErrorAndExit(String.format("Share group '%s' is not subscribed to topic '%s'.",
|
||||
groupId, resetTopics.stream().filter(topic -> !subscribedTopics.contains(topic)).collect(Collectors.joining(", "))));
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
partitionsToReset = offsetsByTopicPartitions.keySet();
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.kafka.tools.consumer.group;
|
||||
|
||||
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.AdminClientTestUtils;
|
||||
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
|
||||
|
@ -1373,7 +1372,7 @@ public class ShareGroupCommandTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAlterShareGroupFailureWithNonExistentTopic() {
|
||||
public void testAlterShareGroupUnsubscribedTopicSuccess() {
|
||||
String group = "share-group";
|
||||
String topic = "none";
|
||||
String bootstrapServer = "localhost:9092";
|
||||
|
@ -1386,18 +1385,22 @@ public class ShareGroupCommandTest {
|
|||
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L)))
|
||||
)
|
||||
);
|
||||
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
|
||||
|
||||
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = mockAlterShareGroupOffsets(adminClient, group);
|
||||
TopicPartition tp0 = new TopicPartition(topic, 0);
|
||||
Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp0, new OffsetAndMetadata(0L));
|
||||
ListOffsetsResult listOffsetsResult = AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
|
||||
when(adminClient.listOffsets(any(), any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
|
||||
|
||||
ShareGroupDescription exp = new ShareGroupDescription(
|
||||
group,
|
||||
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
|
||||
Set.of(new TopicPartition(topic, 0))
|
||||
), 0)),
|
||||
List.of(),
|
||||
GroupState.EMPTY,
|
||||
new Node(0, "host1", 9090), 0, 0);
|
||||
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
|
||||
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, KafkaFuture.completedFuture(exp)));
|
||||
when(adminClient.describeShareGroups(any(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
|
||||
AtomicBoolean exited = new AtomicBoolean(false);
|
||||
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
|
||||
Map<String, TopicDescription> descriptions = Map.of(
|
||||
topic, new TopicDescription(topic, false, List.of(
|
||||
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
|
||||
|
@ -1406,15 +1409,12 @@ public class ShareGroupCommandTest {
|
|||
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
|
||||
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
|
||||
when(adminClient.describeTopics(anyCollection(), any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
|
||||
Exit.setExitProcedure(((statusCode, message) -> {
|
||||
assertNotEquals(0, statusCode);
|
||||
assertTrue(message.contains("Share group 'share-group' is not subscribed to topic 'none'"));
|
||||
exited.set(true);
|
||||
}));
|
||||
try {
|
||||
getShareGroupService(cgcArgs, adminClient).resetOffsets();
|
||||
} finally {
|
||||
assertTrue(exited.get());
|
||||
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
|
||||
service.resetOffsets();
|
||||
verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
|
||||
verify(adminClient).describeTopics(anyCollection(), any(DescribeTopicsOptions.class));
|
||||
verify(alterShareGroupOffsetsResult, times(1)).all();
|
||||
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue