MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions (#20370)

This is followup PR for https://github.com/apache/kafka/pull/19699.

* Update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions
* Update `TxnTransitMetadata` comment, because it's not immutable.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Justine Olshan
 <jolshan@confluent.io>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
 Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-08-26 10:36:45 +08:00 committed by GitHub
parent b2c1a0fb9f
commit 5bbc421a13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 16 additions and 14 deletions

View File

@ -115,7 +115,13 @@ object TransactionLog {
val version = buffer.getShort
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
val value = new TransactionLogValue(new ByteBufferAccessor(buffer), version)
val transactionMetadata = new TransactionMetadata(
val state = TransactionState.fromId(value.transactionStatus)
val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]()
if (!state.equals(TransactionState.EMPTY))
value.transactionPartitions.forEach(partitionsSchema => {
partitionsSchema.partitionIds.forEach(partitionId => tps.add(new TopicPartition(partitionsSchema.topic, partitionId.intValue())))
})
Some(new TransactionMetadata(
transactionalId,
value.producerId,
value.previousProducerId,
@ -123,20 +129,11 @@ object TransactionLog {
value.producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH,
value.transactionTimeoutMs,
TransactionState.fromId(value.transactionStatus),
util.Set.of(),
state,
tps,
value.transactionStartTimestampMs,
value.transactionLastUpdateTimestampMs,
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion))
if (!transactionMetadata.state.equals(TransactionState.EMPTY))
value.transactionPartitions.forEach(partitionsSchema => {
transactionMetadata.addPartitions(partitionsSchema.partitionIds
.stream
.map(partitionId => new TopicPartition(partitionsSchema.topic, partitionId.intValue()))
.toList)
})
Some(transactionMetadata)
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
} else throw new IllegalStateException(s"Unknown version $version from the transaction log message value")
}
}

View File

@ -117,6 +117,7 @@ public class TransactionMetadata {
}
}
// VisibleForTesting
public void addPartitions(Collection<TopicPartition> partitions) {
topicPartitions.addAll(partitions);
}
@ -500,6 +501,7 @@ public class TransactionMetadata {
return transactionalId;
}
// VisibleForTesting
public void setProducerId(long producerId) {
this.producerId = producerId;
}
@ -507,6 +509,7 @@ public class TransactionMetadata {
return producerId;
}
// VisibleForTesting
public void setPrevProducerId(long prevProducerId) {
this.prevProducerId = prevProducerId;
}
@ -534,6 +537,7 @@ public class TransactionMetadata {
return txnTimeoutMs;
}
// VisibleForTesting
public void state(TransactionState state) {
this.state = state;
}
@ -550,6 +554,7 @@ public class TransactionMetadata {
return txnStartTimestamp;
}
// VisibleForTesting
public void txnLastUpdateTimestamp(long txnLastUpdateTimestamp) {
this.txnLastUpdateTimestamp = txnLastUpdateTimestamp;
}

View File

@ -22,7 +22,7 @@ import org.apache.kafka.server.common.TransactionVersion;
import java.util.HashSet;
/**
* Immutable object representing the target transition of the transaction metadata
* Represent the target transition of the transaction metadata. The topicPartitions field is mutable.
*/
public record TxnTransitMetadata(
long producerId,