mirror of https://github.com/apache/kafka.git
MINOR: kafka-stream-groups.sh should fail quickly if the partition leader is unavailable (#20271)
This PR applies the same partition leader check for `StreamsGroupCommand` as `ShareGroupCommand` and `ConsumerGroupCommand` to avoid the command execution timeout. Reviewers: Lucas Brutschy <lucasbru@apache.org>
This commit is contained in:
parent
f621a635c1
commit
d2f162a071
|
|
@ -881,6 +881,7 @@ public class StreamsGroupCommand {
|
|||
List<String> topics = opts.options.valuesOf(opts.inputTopicOpt);
|
||||
|
||||
List<TopicPartition> partitions = offsetsUtils.parseTopicPartitionsToReset(topics);
|
||||
offsetsUtils.checkAllTopicPartitionsValid(partitions);
|
||||
// if the user specified topics that do not belong to this group, we filter them out
|
||||
partitions = filterExistingGroupTopics(groupId, partitions);
|
||||
return partitions;
|
||||
|
|
|
|||
|
|
@ -1373,7 +1373,7 @@ public class ShareGroupCommandTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAlterShareGroupFailureFailureWithNonExistentTopic() {
|
||||
public void testAlterShareGroupFailureWithNonExistentTopic() {
|
||||
String group = "share-group";
|
||||
String topic = "none";
|
||||
String bootstrapServer = "localhost:9092";
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ import org.apache.kafka.common.KafkaFuture;
|
|||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
|
|
@ -65,6 +66,7 @@ import java.util.stream.IntStream;
|
|||
|
||||
import joptsimple.OptionException;
|
||||
|
||||
import static org.apache.kafka.common.KafkaFuture.completedFuture;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
|
|
@ -72,6 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
|
@ -293,21 +296,30 @@ public class StreamsGroupCommandTest {
|
|||
@Test
|
||||
public void testAdminRequestsForResetOffsets() {
|
||||
Admin adminClient = mock(KafkaAdminClient.class);
|
||||
String topic = "topic1";
|
||||
String groupId = "foo-group";
|
||||
List<String> args = List.of("--bootstrap-server", "localhost:9092", "--group", groupId, "--reset-offsets", "--input-topic", "topic1", "--to-latest");
|
||||
List<String> topics = List.of("topic1");
|
||||
List<String> topics = List.of(topic);
|
||||
|
||||
DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class);
|
||||
when(adminClient.describeStreamsGroups(List.of(groupId)))
|
||||
.thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
|
||||
Map<String, TopicDescription> descriptions = Map.of(
|
||||
topic, new TopicDescription(topic, false, List.of(
|
||||
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
|
||||
));
|
||||
when(adminClient.describeTopics(anyCollection()))
|
||||
.thenReturn(describeTopicsResult);
|
||||
when(adminClient.describeTopics(eq(topics), any(DescribeTopicsOptions.class)))
|
||||
.thenReturn(describeTopicsResult(topics, 1));
|
||||
.thenReturn(describeTopicsResult);
|
||||
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
|
||||
when(adminClient.listOffsets(any(), any()))
|
||||
.thenReturn(listOffsetsResult());
|
||||
ListGroupsResult listGroupsResult = listGroupResult(groupId);
|
||||
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
|
||||
ListStreamsGroupOffsetsResult result = mock(ListStreamsGroupOffsetsResult.class);
|
||||
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new HashMap<>();
|
||||
committedOffsetsMap.put(new TopicPartition("topic1", 0), mock(OffsetAndMetadata.class));
|
||||
committedOffsetsMap.put(new TopicPartition(topic, 0), mock(OffsetAndMetadata.class));
|
||||
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
|
||||
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
|
||||
|
||||
|
|
@ -427,6 +439,43 @@ public class StreamsGroupCommandTest {
|
|||
|
||||
service.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetOffsetsWithPartitionNotExist() {
|
||||
Admin adminClient = mock(KafkaAdminClient.class);
|
||||
String groupId = "foo-group";
|
||||
String topic = "topic";
|
||||
List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--reset-offsets", "--input-topic", "topic:3", "--to-latest"));
|
||||
|
||||
when(adminClient.describeStreamsGroups(List.of(groupId)))
|
||||
.thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
|
||||
DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class);
|
||||
|
||||
Map<String, TopicDescription> descriptions = Map.of(
|
||||
topic, new TopicDescription(topic, false, List.of(
|
||||
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
|
||||
));
|
||||
when(adminClient.describeTopics(anyCollection()))
|
||||
.thenReturn(describeTopicsResult);
|
||||
when(adminClient.describeTopics(eq(List.of(topic)), any(DescribeTopicsOptions.class)))
|
||||
.thenReturn(describeTopicsResult);
|
||||
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
|
||||
when(adminClient.listOffsets(any(), any()))
|
||||
.thenReturn(listOffsetsResult());
|
||||
ListStreamsGroupOffsetsResult result = mock(ListStreamsGroupOffsetsResult.class);
|
||||
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = Map.of(
|
||||
new TopicPartition(topic, 0),
|
||||
new OffsetAndMetadata(12, Optional.of(0), ""),
|
||||
new TopicPartition(topic, 1),
|
||||
new OffsetAndMetadata(12, Optional.of(0), "")
|
||||
);
|
||||
|
||||
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
|
||||
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
|
||||
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient);
|
||||
assertThrows(UnknownTopicOrPartitionException.class, () -> service.resetOffsets());
|
||||
service.close();
|
||||
}
|
||||
|
||||
private ListGroupsResult listGroupResult(String groupId) {
|
||||
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
|
||||
|
|
|
|||
Loading…
Reference in New Issue