KAFKA-12979; Implement command to find hanging transactions (#10974)

This patch implements the `find-hanging` command described in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-FindingHangingTransactions.

Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Jason Gustafson 2021-07-06 10:39:59 -07:00 committed by GitHub
parent fb6425188c
commit f29c43bdbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1050 additions and 14 deletions

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Objects;
import java.util.Set; import java.util.Set;
/** /**
@ -88,4 +89,18 @@ public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOpt
", timeoutMs=" + timeoutMs + ", timeoutMs=" + timeoutMs +
')'; ')';
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ListTransactionsOptions that = (ListTransactionsOptions) o;
return Objects.equals(filteredStates, that.filteredStates) &&
Objects.equals(filteredProducerIds, that.filteredProducerIds);
}
@Override
public int hashCode() {
return Objects.hash(filteredStates, filteredProducerIds);
}
} }

View File

@ -28,10 +28,15 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeProducersOptions; import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult; import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -43,12 +48,17 @@ import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -434,7 +444,7 @@ public abstract class TransactionsCommand {
final Map<Integer, Collection<TransactionListing>> result; final Map<Integer, Collection<TransactionListing>> result;
try { try {
result = admin.listTransactions() result = admin.listTransactions(new ListTransactionsOptions())
.allByBrokerId() .allByBrokerId()
.get(); .get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -461,6 +471,436 @@ public abstract class TransactionsCommand {
} }
} }
static class FindHangingTransactionsCommand extends TransactionsCommand {
private static final int MAX_BATCH_SIZE = 500;
static final String[] HEADERS = new String[] {
"Topic",
"Partition",
"ProducerId",
"ProducerEpoch",
"CoordinatorEpoch",
"StartOffset",
"LastTimestamp",
"Duration(min)"
};
FindHangingTransactionsCommand(Time time) {
super(time);
}
@Override
String name() {
return "find-hanging";
}
@Override
void addSubparser(Subparsers subparsers) {
Subparser subparser = subparsers.addParser(name())
.help("find hanging transactions");
subparser.addArgument("--broker-id")
.help("broker id to search for hanging transactions")
.action(store())
.type(Integer.class)
.required(false);
subparser.addArgument("--max-transaction-timeout")
.help("maximum transaction timeout in minutes to limit the scope of the search (15 minutes by default)")
.action(store())
.type(Integer.class)
.setDefault(15)
.required(false);
subparser.addArgument("--topic")
.help("topic name to limit search to (required if --partition is specified)")
.action(store())
.type(String.class)
.required(false);
subparser.addArgument("--partition")
.help("partition number")
.action(store())
.type(Integer.class)
.required(false);
}
@Override
void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
if (!topic.isPresent() && !brokerId.isPresent()) {
printErrorAndExit("The `find-hanging` command requires either --topic " +
"or --broker-id to limit the scope of the search");
return;
}
Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
if (partition.isPresent() && !topic.isPresent()) {
printErrorAndExit("The --partition argument requires --topic to be provided");
return;
}
long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
ns.getInt("max_transaction_timeout"));
List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
admin,
topic,
partition,
brokerId
);
List<OpenTransaction> candidates = collectCandidateOpenTransactions(
admin,
brokerId,
maxTransactionTimeoutMs,
topicPartitions
);
if (candidates.isEmpty()) {
printHangingTransactions(Collections.emptyList(), out);
} else {
Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
Map<Long, String> transactionalIds = lookupTransactionalIds(
admin,
openTransactionsByProducerId.keySet()
);
Map<String, TransactionDescription> descriptions = describeTransactions(
admin,
transactionalIds.values()
);
List<OpenTransaction> hangingTransactions = filterHangingTransactions(
openTransactionsByProducerId,
transactionalIds,
descriptions
);
printHangingTransactions(hangingTransactions, out);
}
}
private List<TopicPartition> collectTopicPartitionsToSearch(
Admin admin,
Optional<String> topic,
Optional<Integer> partition,
Optional<Integer> brokerId
) throws Exception {
final List<String> topics;
if (topic.isPresent()) {
if (partition.isPresent()) {
return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
} else {
topics = Collections.singletonList(topic.get());
}
} else {
topics = listTopics(admin);
}
return findTopicPartitions(
admin,
brokerId,
topics
);
}
private List<OpenTransaction> filterHangingTransactions(
Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
Map<Long, String> transactionalIds,
Map<String, TransactionDescription> descriptions
) {
List<OpenTransaction> hangingTransactions = new ArrayList<>();
openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
String transactionalId = transactionalIds.get(producerId);
if (transactionalId == null) {
// If we could not find the transactionalId corresponding to the
// producerId of an open transaction, then the transaction is hanging.
hangingTransactions.addAll(openTransactions);
} else {
// Otherwise, we need to check the current transaction state.
TransactionDescription description = descriptions.get(transactionalId);
if (description == null) {
hangingTransactions.addAll(openTransactions);
} else {
for (OpenTransaction openTransaction : openTransactions) {
// The `DescribeTransactions` API returns all partitions being
// written to in an ongoing transaction and any partition which
// does not yet have markers written when in the `PendingAbort` or
// `PendingCommit` states. If the topic partition that we found is
// among these, then we can still expect the coordinator to write
// the marker. Otherwise, it is a hanging transaction.
if (!description.topicPartitions().contains(openTransaction.topicPartition)) {
hangingTransactions.add(openTransaction);
}
}
}
}
});
return hangingTransactions;
}
private void printHangingTransactions(
List<OpenTransaction> hangingTransactions,
PrintStream out
) {
long currentTimeMs = time.milliseconds();
List<String[]> rows = new ArrayList<>(hangingTransactions.size());
for (OpenTransaction transaction : hangingTransactions) {
long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(
currentTimeMs - transaction.producerState.lastTimestamp());
rows.add(new String[] {
transaction.topicPartition.topic(),
String.valueOf(transaction.topicPartition.partition()),
String.valueOf(transaction.producerState.producerId()),
String.valueOf(transaction.producerState.producerEpoch()),
String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
String.valueOf(transaction.producerState.lastTimestamp()),
String.valueOf(transactionDurationMinutes)
});
}
prettyPrintTable(HEADERS, rows, out);
}
private Map<String, TransactionDescription> describeTransactions(
Admin admin,
Collection<String> transactionalIds
) throws Exception {
try {
DescribeTransactionsResult result = admin.describeTransactions(new HashSet<>(transactionalIds));
Map<String, TransactionDescription> descriptions = new HashMap<>();
for (String transactionalId : transactionalIds) {
try {
TransactionDescription description = result.description(transactionalId).get();
descriptions.put(transactionalId, description);
} catch (ExecutionException e) {
if (e.getCause() instanceof TransactionalIdNotFoundException) {
descriptions.put(transactionalId, null);
} else {
throw e;
}
}
}
return descriptions;
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe " + transactionalIds.size()
+ " transactions", e.getCause());
return Collections.emptyMap();
}
}
private Map<Long, List<OpenTransaction>> groupByProducerId(
List<OpenTransaction> openTransactions
) {
Map<Long, List<OpenTransaction>> res = new HashMap<>();
for (OpenTransaction transaction : openTransactions) {
List<OpenTransaction> states = res.computeIfAbsent(
transaction.producerState.producerId(),
__ -> new ArrayList<>()
);
states.add(transaction);
}
return res;
}
private List<String> listTopics(
Admin admin
) throws Exception {
try {
return new ArrayList<>(admin.listTopics().names().get());
} catch (ExecutionException e) {
printErrorAndExit("Failed to list topics", e.getCause());
return Collections.emptyList();
}
}
private List<TopicPartition> findTopicPartitions(
Admin admin,
Optional<Integer> brokerId,
List<String> topics
) throws Exception {
List<TopicPartition> topicPartitions = new ArrayList<>();
consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
findTopicPartitions(
admin,
brokerId,
batch,
topicPartitions
);
});
return topicPartitions;
}
private void findTopicPartitions(
Admin admin,
Optional<Integer> brokerId,
List<String> topics,
List<TopicPartition> topicPartitions
) throws Exception {
try {
Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).all().get();
topicDescriptions.forEach((topic, description) -> {
description.partitions().forEach(partitionInfo -> {
if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) {
topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
});
});
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
}
}
private boolean hasReplica(
int brokerId,
TopicPartitionInfo partitionInfo
) {
return partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId);
}
private List<OpenTransaction> collectCandidateOpenTransactions(
Admin admin,
Optional<Integer> brokerId,
long maxTransactionTimeoutMs,
List<TopicPartition> topicPartitions
) throws Exception {
// We have to check all partitions on the broker. In order to avoid
// overwhelming it with a giant request, we break the requests into
// smaller batches.
List<OpenTransaction> candidateTransactions = new ArrayList<>();
consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
collectCandidateOpenTransactions(
admin,
brokerId,
maxTransactionTimeoutMs,
batch,
candidateTransactions
);
});
return candidateTransactions;
}
private static class OpenTransaction {
private final TopicPartition topicPartition;
private final ProducerState producerState;
private OpenTransaction(
TopicPartition topicPartition,
ProducerState producerState
) {
this.topicPartition = topicPartition;
this.producerState = producerState;
}
}
private void collectCandidateOpenTransactions(
Admin admin,
Optional<Integer> brokerId,
long maxTransactionTimeoutMs,
List<TopicPartition> topicPartitions,
List<OpenTransaction> candidateTransactions
) throws Exception {
try {
DescribeProducersOptions describeOptions = new DescribeProducersOptions();
brokerId.ifPresent(describeOptions::brokerId);
Map<TopicPartition, DescribeProducersResult.PartitionProducerState> producersByPartition =
admin.describeProducers(topicPartitions, describeOptions).all().get();
long currentTimeMs = time.milliseconds();
producersByPartition.forEach((topicPartition, producersStates) -> {
producersStates.activeProducers().forEach(activeProducer -> {
if (activeProducer.currentTransactionStartOffset().isPresent()) {
long transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp();
if (transactionDurationMs > maxTransactionTimeoutMs) {
candidateTransactions.add(new OpenTransaction(
topicPartition,
activeProducer
));
}
}
});
});
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe producers for " + topicPartitions.size() +
" partitions on broker " + brokerId, e.getCause());
}
}
private Map<Long, String> lookupTransactionalIds(
Admin admin,
Set<Long> producerIds
) throws Exception {
try {
ListTransactionsOptions listTransactionsOptions = new ListTransactionsOptions()
.filterProducerIds(producerIds);
Collection<TransactionListing> transactionListings =
admin.listTransactions(listTransactionsOptions).all().get();
Map<Long, String> transactionalIdMap = new HashMap<>();
transactionListings.forEach(listing -> {
if (!producerIds.contains(listing.producerId())) {
log.debug("Received transaction listing {} which has a producerId " +
"which was not requested", listing);
} else {
transactionalIdMap.put(
listing.producerId(),
listing.transactionalId()
);
}
});
return transactionalIdMap;
} catch (ExecutionException e) {
printErrorAndExit("Failed to list transactions for " + producerIds.size() +
" producers", e.getCause());
return Collections.emptyMap();
}
}
@FunctionalInterface
private interface ThrowableConsumer<T> {
void accept(T t) throws Exception;
}
private <T> void consumeInBatches(
List<T> list,
int batchSize,
ThrowableConsumer<List<T>> consumer
) throws Exception {
int batchStartIndex = 0;
int limitIndex = list.size();
while (batchStartIndex < limitIndex) {
int batchEndIndex = Math.min(
limitIndex,
batchStartIndex + batchSize
);
consumer.accept(list.subList(batchStartIndex, batchEndIndex));
batchStartIndex = batchEndIndex;
}
}
}
private static void appendColumnValue( private static void appendColumnValue(
StringBuilder rowBuilder, StringBuilder rowBuilder,
String value, String value,
@ -580,7 +1020,8 @@ public abstract class TransactionsCommand {
new ListTransactionsCommand(time), new ListTransactionsCommand(time),
new DescribeTransactionsCommand(time), new DescribeTransactionsCommand(time),
new DescribeProducersCommand(time), new DescribeProducersCommand(time),
new AbortTransactionCommand(time) new AbortTransactionCommand(time),
new FindHangingTransactionsCommand(time)
); );
ArgumentParser parser = buildBaseParser(); ArgumentParser parser = buildBaseParser();

View File

@ -22,14 +22,21 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeProducersOptions; import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult; import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState; import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult; import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.admin.TransactionState; import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
@ -48,7 +55,9 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -56,10 +65,14 @@ import java.util.Map;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.KafkaFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -145,7 +158,7 @@ public class TransactionsCommandTest {
DescribeProducersOptions expectedOptions DescribeProducersOptions expectedOptions
) throws Exception { ) throws Exception {
DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class); DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class);
KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture( KafkaFuture<PartitionProducerState> describeFuture = completedFuture(
new PartitionProducerState(asList( new PartitionProducerState(asList(
new ProducerState(12345L, 15, 1300, 1599509565L, new ProducerState(12345L, 15, 1300, 1599509565L,
OptionalInt.of(20), OptionalLong.of(990)), OptionalInt.of(20), OptionalLong.of(990)),
@ -181,8 +194,6 @@ public class TransactionsCommandTest {
"list" "list"
}; };
ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
Map<Integer, Collection<TransactionListing>> transactions = new HashMap<>(); Map<Integer, Collection<TransactionListing>> transactions = new HashMap<>();
transactions.put(0, asList( transactions.put(0, asList(
new TransactionListing("foo", 12345L, TransactionState.ONGOING), new TransactionListing("foo", 12345L, TransactionState.ONGOING),
@ -192,11 +203,7 @@ public class TransactionsCommandTest {
new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT) new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT)
)); ));
KafkaFuture<Map<Integer, Collection<TransactionListing>>> listTransactionsFuture = expectListTransactions(transactions);
KafkaFutureImpl.completedFuture(transactions);
Mockito.when(admin.listTransactions()).thenReturn(listResult);
Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture);
execute(args); execute(args);
assertNormalExit(); assertNormalExit();
@ -241,7 +248,7 @@ public class TransactionsCommandTest {
int coordinatorId = 5; int coordinatorId = 5;
long transactionStartTime = time.milliseconds(); long transactionStartTime = time.milliseconds();
KafkaFuture<TransactionDescription> describeFuture = KafkaFutureImpl.completedFuture( KafkaFuture<TransactionDescription> describeFuture = completedFuture(
new TransactionDescription( new TransactionDescription(
coordinatorId, coordinatorId,
TransactionState.ONGOING, TransactionState.ONGOING,
@ -373,14 +380,14 @@ public class TransactionsCommandTest {
}; };
DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class); DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class);
KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture( KafkaFuture<PartitionProducerState> describeFuture = completedFuture(
new PartitionProducerState(singletonList( new PartitionProducerState(singletonList(
new ProducerState(producerId, producerEpoch, 1300, 1599509565L, new ProducerState(producerId, producerEpoch, 1300, 1599509565L,
OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset)) OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset))
))); )));
AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class); AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class);
KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null); KafkaFuture<Void> abortFuture = completedFuture(null);
AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec( AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
topicPartition, producerId, producerEpoch, coordinatorEpoch); topicPartition, producerId, producerEpoch, coordinatorEpoch);
@ -418,7 +425,7 @@ public class TransactionsCommandTest {
}; };
AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class); AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class);
KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null); KafkaFuture<Void> abortFuture = completedFuture(null);
final int expectedCoordinatorEpoch; final int expectedCoordinatorEpoch;
if (coordinatorEpoch < 0) { if (coordinatorEpoch < 0) {
@ -437,6 +444,579 @@ public class TransactionsCommandTest {
assertNormalExit(); assertNormalExit();
} }
@Test
public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
assertCommandFailure(new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging"
});
}
@Test
public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
assertCommandFailure(new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--broker-id",
"0",
"--partition",
"5"
});
}
private void expectListTransactions(
Map<Integer, Collection<TransactionListing>> listingsByBroker
) {
expectListTransactions(new ListTransactionsOptions(), listingsByBroker);
}
private void expectListTransactions(
ListTransactionsOptions options,
Map<Integer, Collection<TransactionListing>> listingsByBroker
) {
ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
List<TransactionListing> allListings = new ArrayList<>();
listingsByBroker.values().forEach(allListings::addAll);
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
}
private void expectDescribeProducers(
TopicPartition topicPartition,
long producerId,
short producerEpoch,
long lastTimestamp,
OptionalInt coordinatorEpoch,
OptionalLong txnStartOffset
) {
PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
new ProducerState(
producerId,
producerEpoch,
500,
lastTimestamp,
coordinatorEpoch,
txnStartOffset
)
));
DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
Mockito.when(result.all()).thenReturn(
completedFuture(singletonMap(topicPartition, partitionProducerState))
);
Mockito.when(admin.describeProducers(
Collections.singletonList(topicPartition),
new DescribeProducersOptions()
)).thenReturn(result);
}
private void expectDescribeTransactions(
Map<String, TransactionDescription> descriptions
) {
DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
descriptions.forEach((transactionalId, description) -> {
Mockito.when(result.description(transactionalId))
.thenReturn(completedFuture(description));
});
Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
}
private void expectListTopics(
Set<String> topics
) {
ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
Mockito.when(result.names()).thenReturn(completedFuture(topics));
Mockito.when(admin.listTopics()).thenReturn(result);
}
private void expectDescribeTopics(
Map<String, TopicDescription> descriptions
) {
DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
}
@Test
public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
int brokerId = 5;
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--broker-id",
String.valueOf(brokerId)
};
String topic = "foo";
expectListTopics(singleton(topic));
Node node0 = new Node(0, "localhost", 9092);
Node node1 = new Node(1, "localhost", 9093);
Node node5 = new Node(5, "localhost", 9097);
TopicPartitionInfo partition0 = new TopicPartitionInfo(
0,
node0,
Arrays.asList(node0, node1),
Arrays.asList(node0, node1)
);
TopicPartitionInfo partition1 = new TopicPartitionInfo(
1,
node1,
Arrays.asList(node1, node5),
Arrays.asList(node1, node5)
);
TopicDescription description = new TopicDescription(
topic,
false,
Arrays.asList(partition0, partition1)
);
expectDescribeTopics(singletonMap(topic, description));
DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
Mockito.when(admin.describeProducers(
Collections.singletonList(new TopicPartition(topic, 1)),
new DescribeProducersOptions().brokerId(brokerId)
)).thenReturn(result);
execute(args);
assertNormalExit();
assertNoHangingTransactions();
}
@Test
public void testFindHangingLookupTopicAndBrokerId() throws Exception {
int brokerId = 5;
String topic = "foo";
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--broker-id",
String.valueOf(brokerId),
"--topic",
topic
};
Node node0 = new Node(0, "localhost", 9092);
Node node1 = new Node(1, "localhost", 9093);
Node node5 = new Node(5, "localhost", 9097);
TopicPartitionInfo partition0 = new TopicPartitionInfo(
0,
node0,
Arrays.asList(node0, node1),
Arrays.asList(node0, node1)
);
TopicPartitionInfo partition1 = new TopicPartitionInfo(
1,
node1,
Arrays.asList(node1, node5),
Arrays.asList(node1, node5)
);
TopicDescription description = new TopicDescription(
topic,
false,
Arrays.asList(partition0, partition1)
);
expectDescribeTopics(singletonMap(topic, description));
DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
Mockito.when(admin.describeProducers(
Collections.singletonList(new TopicPartition(topic, 1)),
new DescribeProducersOptions().brokerId(brokerId)
)).thenReturn(result);
execute(args);
assertNormalExit();
assertNoHangingTransactions();
}
@Test
public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
String topic = "foo";
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--topic",
topic
};
Node node0 = new Node(0, "localhost", 9092);
Node node1 = new Node(1, "localhost", 9093);
Node node5 = new Node(5, "localhost", 9097);
TopicPartitionInfo partition0 = new TopicPartitionInfo(
0,
node0,
Arrays.asList(node0, node1),
Arrays.asList(node0, node1)
);
TopicPartitionInfo partition1 = new TopicPartitionInfo(
1,
node1,
Arrays.asList(node1, node5),
Arrays.asList(node1, node5)
);
TopicDescription description = new TopicDescription(
topic,
false,
Arrays.asList(partition0, partition1)
);
expectDescribeTopics(singletonMap(topic, description));
DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
Mockito.when(admin.describeProducers(
Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)),
new DescribeProducersOptions()
)).thenReturn(result);
execute(args);
assertNormalExit();
assertNoHangingTransactions();
}
private void assertNoHangingTransactions() throws Exception {
List<List<String>> table = readOutputAsTable();
assertEquals(1, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
assertEquals(expectedHeaders, table.get(0));
}
@Test
public void testFindHangingSpecifiedTopicPartition() throws Exception {
TopicPartition topicPartition = new TopicPartition("foo", 5);
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--topic",
topicPartition.topic(),
"--partition",
String.valueOf(topicPartition.partition())
};
long producerId = 132L;
short producerEpoch = 5;
long lastTimestamp = time.milliseconds();
OptionalInt coordinatorEpoch = OptionalInt.of(19);
OptionalLong txnStartOffset = OptionalLong.of(29384L);
expectDescribeProducers(
topicPartition,
producerId,
producerEpoch,
lastTimestamp,
coordinatorEpoch,
txnStartOffset
);
execute(args);
assertNormalExit();
List<List<String>> table = readOutputAsTable();
assertEquals(1, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
assertEquals(expectedHeaders, table.get(0));
}
@Test
public void testFindHangingNoMappedTransactionalId() throws Exception {
TopicPartition topicPartition = new TopicPartition("foo", 5);
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--topic",
topicPartition.topic(),
"--partition",
String.valueOf(topicPartition.partition())
};
long producerId = 132L;
short producerEpoch = 5;
long lastTimestamp = time.milliseconds() - TimeUnit.MINUTES.toMillis(60);
int coordinatorEpoch = 19;
long txnStartOffset = 29384L;
expectDescribeProducers(
topicPartition,
producerId,
producerEpoch,
lastTimestamp,
OptionalInt.of(coordinatorEpoch),
OptionalLong.of(txnStartOffset)
);
expectListTransactions(
new ListTransactionsOptions().filterProducerIds(singleton(producerId)),
singletonMap(1, Collections.emptyList())
);
expectDescribeTransactions(Collections.emptyMap());
execute(args);
assertNormalExit();
assertHangingTransaction(
topicPartition,
producerId,
producerEpoch,
coordinatorEpoch,
txnStartOffset,
lastTimestamp
);
}
@Test
public void testFindHangingWithNoTransactionDescription() throws Exception {
TopicPartition topicPartition = new TopicPartition("foo", 5);
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--topic",
topicPartition.topic(),
"--partition",
String.valueOf(topicPartition.partition())
};
long producerId = 132L;
short producerEpoch = 5;
long lastTimestamp = time.milliseconds() - TimeUnit.MINUTES.toMillis(60);
int coordinatorEpoch = 19;
long txnStartOffset = 29384L;
expectDescribeProducers(
topicPartition,
producerId,
producerEpoch,
lastTimestamp,
OptionalInt.of(coordinatorEpoch),
OptionalLong.of(txnStartOffset)
);
String transactionalId = "bar";
TransactionListing listing = new TransactionListing(
transactionalId,
producerId,
TransactionState.ONGOING
);
expectListTransactions(
new ListTransactionsOptions().filterProducerIds(singleton(producerId)),
singletonMap(1, Collections.singletonList(listing))
);
DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
Mockito.when(result.description(transactionalId))
.thenReturn(failedFuture(new TransactionalIdNotFoundException(transactionalId + " not found")));
Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(result);
execute(args);
assertNormalExit();
assertHangingTransaction(
topicPartition,
producerId,
producerEpoch,
coordinatorEpoch,
txnStartOffset,
lastTimestamp
);
}
private <T> KafkaFuture<T> failedFuture(Exception e) {
KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
future.completeExceptionally(e);
return future;
}
@Test
public void testFindHangingDoesNotFilterByTransactionInProgressWithDifferentPartitions() throws Exception {
TopicPartition topicPartition = new TopicPartition("foo", 5);
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--topic",
topicPartition.topic(),
"--partition",
String.valueOf(topicPartition.partition())
};
long producerId = 132L;
short producerEpoch = 5;
long lastTimestamp = time.milliseconds() - TimeUnit.MINUTES.toMillis(60);
int coordinatorEpoch = 19;
long txnStartOffset = 29384L;
expectDescribeProducers(
topicPartition,
producerId,
producerEpoch,
lastTimestamp,
OptionalInt.of(coordinatorEpoch),
OptionalLong.of(txnStartOffset)
);
String transactionalId = "bar";
TransactionListing listing = new TransactionListing(
transactionalId,
producerId,
TransactionState.ONGOING
);
expectListTransactions(
new ListTransactionsOptions().filterProducerIds(singleton(producerId)),
singletonMap(1, Collections.singletonList(listing))
);
// Although there is a transaction in progress from the same
// producer epoch, it does not include the topic partition we
// found when describing producers.
TransactionDescription description = new TransactionDescription(
1,
TransactionState.ONGOING,
producerId,
producerEpoch,
60000,
OptionalLong.of(time.milliseconds()),
singleton(new TopicPartition("foo", 10))
);
expectDescribeTransactions(singletonMap(transactionalId, description));
execute(args);
assertNormalExit();
assertHangingTransaction(
topicPartition,
producerId,
producerEpoch,
coordinatorEpoch,
txnStartOffset,
lastTimestamp
);
}
private void assertHangingTransaction(
TopicPartition topicPartition,
long producerId,
short producerEpoch,
int coordinatorEpoch,
long txnStartOffset,
long lastTimestamp
) throws Exception {
List<List<String>> table = readOutputAsTable();
assertEquals(2, table.size());
List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
assertEquals(expectedHeaders, table.get(0));
long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(time.milliseconds() - lastTimestamp);
List<String> expectedRow = asList(
topicPartition.topic(),
String.valueOf(topicPartition.partition()),
String.valueOf(producerId),
String.valueOf(producerEpoch),
String.valueOf(coordinatorEpoch),
String.valueOf(txnStartOffset),
String.valueOf(lastTimestamp),
String.valueOf(durationMinutes)
);
assertEquals(expectedRow, table.get(1));
}
@Test
public void testFindHangingFilterByTransactionInProgressWithSamePartition() throws Exception {
TopicPartition topicPartition = new TopicPartition("foo", 5);
String[] args = new String[]{
"--bootstrap-server",
"localhost:9092",
"find-hanging",
"--topic",
topicPartition.topic(),
"--partition",
String.valueOf(topicPartition.partition())
};
long producerId = 132L;
short producerEpoch = 5;
long lastTimestamp = time.milliseconds() - TimeUnit.MINUTES.toMillis(60);
int coordinatorEpoch = 19;
long txnStartOffset = 29384L;
expectDescribeProducers(
topicPartition,
producerId,
producerEpoch,
lastTimestamp,
OptionalInt.of(coordinatorEpoch),
OptionalLong.of(txnStartOffset)
);
String transactionalId = "bar";
TransactionListing listing = new TransactionListing(
transactionalId,
producerId,
TransactionState.ONGOING
);
expectListTransactions(
new ListTransactionsOptions().filterProducerIds(singleton(producerId)),
singletonMap(1, Collections.singletonList(listing))
);
// The coordinator shows an active transaction with the same epoch
// which includes the partition, so no hanging transaction should
// be detected.
TransactionDescription description = new TransactionDescription(
1,
TransactionState.ONGOING,
producerId,
producerEpoch,
60000,
OptionalLong.of(lastTimestamp),
singleton(topicPartition)
);
expectDescribeTransactions(singletonMap(transactionalId, description));
execute(args);
assertNormalExit();
assertNoHangingTransactions();
}
private void execute(String[] args) throws Exception { private void execute(String[] args) throws Exception {
TransactionsCommand.execute(args, ns -> admin, out, time); TransactionsCommand.execute(args, ns -> admin, out, time);
} }