KAFKA-13288; Include internal topics when searching hanging transactions (#11319)

This patch ensures that internal topics are included when searching for hanging transactions with the `--broker-id` argument in `kafka-transactions.sh`.

Reviewers: David Jacot <djacot@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Jason Gustafson 2021-09-10 14:33:37 -07:00 committed by GitHub
parent 1e19de3199
commit c1c639db77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 2 deletions

View File

@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Objects;
/**
* Options for {@link Admin#listTopics()}.
*
@ -58,4 +60,24 @@ public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
public boolean shouldListInternal() {
return listInternal;
}
@Override
public String toString() {
return "ListTopicsOptions(" +
"listInternal=" + listInternal +
')';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ListTopicsOptions that = (ListTopicsOptions) o;
return listInternal == that.listInternal;
}
@Override
public int hashCode() {
return Objects.hash(listInternal);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
@ -719,7 +720,8 @@ public abstract class TransactionsCommand {
Admin admin
) throws Exception {
try {
return new ArrayList<>(admin.listTopics().names().get());
ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
return new ArrayList<>(admin.listTopics(listOptions).names().get());
} catch (ExecutionException e) {
printErrorAndExit("Failed to list topics", e.getCause());
return Collections.emptyList();

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
@ -533,7 +534,8 @@ public class TransactionsCommandTest {
) {
ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
Mockito.when(result.names()).thenReturn(completedFuture(topics));
Mockito.when(admin.listTopics()).thenReturn(result);
ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
Mockito.when(admin.listTopics(listOptions)).thenReturn(result);
}
private void expectDescribeTopics(