From 2fdb687029499d33f4a89be08285d565973b403b Mon Sep 17 00:00:00 2001 From: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Tue, 29 Apr 2025 11:52:02 -0700 Subject: [PATCH] KAFKA-19082: [2/4] Add preparedTxnState class to Kafka Producer (KIP-939) (#19470) This is part of the client side changes required to enable 2PC for KIP-939 New KafkaProducer.PreparedTxnState class is going to be defined as following: ``` static public class PreparedTxnState { public String toString(); public PreparedTxnState(String serializedState); public PreparedTxnState(); } ``` The objects of this class can serialize to / deserialize from a string value and can be written to / read from a database. The implementation is going to store producerId and epoch in the format **producerId:epoch** Reviewers: Artem Livshits , Justine Olshan --- .../clients/producer/PreparedTxnState.java | 128 ++++++++++++ .../producer/PreparedTxnStateTest.java | 196 ++++++++++++++++++ 2 files changed, 324 insertions(+) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/PreparedTxnState.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/PreparedTxnStateTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/PreparedTxnState.java b/clients/src/main/java/org/apache/kafka/clients/producer/PreparedTxnState.java new file mode 100644 index 00000000000..b5ee45e807d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/PreparedTxnState.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.record.RecordBatch; + +/** + * Class containing the state of a transaction after it has been prepared for a two-phase commit. + * This state includes the producer ID and epoch, which are needed to commit or abort the transaction. + */ +public class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = RecordBatch.NO_PRODUCER_ID; + this.epoch = RecordBatch.NO_PRODUCER_EPOCH; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = RecordBatch.NO_PRODUCER_ID; + this.epoch = RecordBatch.NO_PRODUCER_EPOCH; + return; + } + + try { + String[] parts = serializedState.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState); + } + + this.producerId = Long.parseLong(parts[0]); + this.epoch = Short.parseShort(parts[1]); + + // Validate the producerId and epoch values. + if (!(this.producerId >= 0 && this.epoch >= 0)) { + throw new IllegalArgumentException("Invalid producer ID and epoch values: " + + producerId + ":" + epoch + ". Both must be >= 0"); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState, e); + } + } + + /** + * Creates a new PreparedTxnState with the given producer ID and epoch + * + * @param producerId The producer ID + * @param epoch The producer epoch + */ + PreparedTxnState(long producerId, short epoch) { + this.producerId = producerId; + this.epoch = epoch; + } + + public long producerId() { + return producerId; + } + + public short epoch() { + return epoch; + } + + /** + * Checks if this preparedTxnState represents an initialized transaction with a valid producer ID + * that is not -1 (the uninitialized value). + * + * @return true if the state has an initialized transaction, false otherwise. + */ + public boolean hasTransaction() { + return producerId != RecordBatch.NO_PRODUCER_ID; + } + + /** + * Returns a serialized string representation of this transaction state. + * The format is "producerId:epoch" for an initialized state, or an empty string + * for an uninitialized state (where producerId and epoch are both -1). + * + * @return a serialized string representation + */ + @Override + public String toString() { + if (!hasTransaction()) { + return ""; + } + return producerId + ":" + epoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PreparedTxnState that = (PreparedTxnState) o; + return producerId == that.producerId && epoch == that.epoch; + } + + @Override + public int hashCode() { + int result = 31; + result = 31 * result + Long.hashCode(producerId); + result = 31 * result + (int) epoch; + return result; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PreparedTxnStateTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PreparedTxnStateTest.java new file mode 100644 index 00000000000..f0d94434d47 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PreparedTxnStateTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for the PreparedTxnState class part of the #KafkaProducer class. + */ +public class PreparedTxnStateTest { + + @Test + public void testDefaultConstructor() { + PreparedTxnState state = new PreparedTxnState(); + assertEquals("", state.toString(), "Empty state should serialize to an empty string"); + assertEquals(-1L, state.producerId(), "Default producerId should be -1"); + assertEquals((short) -1, state.epoch(), "Default epoch should be -1"); + assertFalse(state.hasTransaction(), "Default state should not have a transaction"); + } + + @Test + public void testParameterizedConstructor() { + long producerId = 123L; + short epoch = 45; + PreparedTxnState state = new PreparedTxnState(producerId, epoch); + assertEquals(producerId, state.producerId(), "ProducerId should match"); + assertEquals(epoch, state.epoch(), "Epoch should match"); + assertTrue(state.hasTransaction(), "State should have a transaction"); + assertEquals("123:45", state.toString(), "Serialized form should match expected format"); + } + + @Test + public void testDeserializationFromString() { + String serialized = "123:45"; + PreparedTxnState state = new PreparedTxnState(serialized); + assertEquals(serialized, state.toString(), "Deserialized state should match the original serialized string"); + assertEquals(123L, state.producerId(), "Deserialized producerId should match"); + assertEquals((short) 45, state.epoch(), "Deserialized epoch should match"); + assertTrue(state.hasTransaction(), "Deserialized state should have a transaction"); + } + + @Test + public void testRoundTripSerialization() { + // Create initialized state from string, then convert back to string + String original = "9876:54"; + PreparedTxnState state = new PreparedTxnState(original); + String serialized = state.toString(); + assertEquals(original, serialized, "Round-trip serialization should preserve values"); + + // Deserialize again to verify + PreparedTxnState stateAgain = new PreparedTxnState(serialized); + assertEquals(original, stateAgain.toString(), "Re-deserialized state should match original"); + assertEquals(state.producerId(), stateAgain.producerId(), "Producer IDs should match"); + assertEquals(state.epoch(), stateAgain.epoch(), "Epochs should match"); + + // Test round trip for uninitialized state (empty string) + String emptyString = ""; + PreparedTxnState emptyState = new PreparedTxnState(emptyString); + String emptyStateSerialized = emptyState.toString(); + assertEquals(emptyString, emptyStateSerialized, "Round-trip of empty string should remain empty"); + assertEquals(-1L, emptyState.producerId(), "Empty string should result in producerId -1"); + assertEquals((short) -1, emptyState.epoch(), "Empty string should result in epoch -1"); + + // Deserialize empty state again to verify + PreparedTxnState emptyStateAgain = new PreparedTxnState(emptyStateSerialized); + assertEquals(emptyString, emptyStateAgain.toString(), "Re-deserialized empty state should still be empty"); + assertEquals(-1L, emptyStateAgain.producerId(), "Empty string should result in producerId -1"); + assertEquals((short) -1, emptyStateAgain.epoch(), "Empty string should result in epoch -1"); + } + + @Test + public void testHandlingOfNullOrEmptyString() { + PreparedTxnState stateWithNull = new PreparedTxnState(null); + assertEquals("", stateWithNull.toString(), "Null string should result in empty state"); + assertFalse(stateWithNull.hasTransaction(), "State from null string should not have a transaction"); + + PreparedTxnState stateWithEmpty = new PreparedTxnState(""); + assertEquals("", stateWithEmpty.toString(), "Empty string should result in empty state"); + assertFalse(stateWithEmpty.hasTransaction(), "State from empty string should not have a transaction"); + } + + @Test + public void testMaxValues() { + // Test with maximum possible values for producer ID and epoch + String maxValues = Long.MAX_VALUE + ":" + Short.MAX_VALUE; + PreparedTxnState state = new PreparedTxnState(maxValues); + assertEquals(maxValues, state.toString(), "Max values should be handled correctly"); + assertEquals(Long.MAX_VALUE, state.producerId(), "Max producer ID should be handled correctly"); + assertEquals(Short.MAX_VALUE, state.epoch(), "Max epoch should be handled correctly"); + assertTrue(state.hasTransaction(), "State with max values should have a transaction"); + } + + @Test + public void testEqualsAndHashCode() { + PreparedTxnState state1 = new PreparedTxnState(123L, (short) 45); + PreparedTxnState state2 = new PreparedTxnState(123L, (short) 45); + PreparedTxnState state3 = new PreparedTxnState(456L, (short) 78); + PreparedTxnState state4 = new PreparedTxnState(123L, (short) 46); + + // Test equals + assertEquals(state1, state2, "Equal states should be equal"); + assertNotEquals(state1, state3, "States with different producer IDs should not be equal"); + assertNotEquals(state1, state4, "States with different epochs should not be equal"); + assertNotEquals(null, state1, "State should not equal null"); + + // Test hashCode + assertEquals(state1.hashCode(), state2.hashCode(), "Equal states should have same hash code"); + assertNotEquals(state1.hashCode(), state3.hashCode(), "Different states should have different hash codes"); + } + + @Test + public void testHasTransaction() { + // State with transaction (producer ID >= 0) + PreparedTxnState stateWithTransaction = new PreparedTxnState(0L, (short) 0); + assertTrue(stateWithTransaction.hasTransaction(), "State with producerId 0 should have a transaction"); + + // State without transaction (producer ID = -1) + PreparedTxnState stateWithoutTransaction = new PreparedTxnState(-1L, (short) -1); + assertFalse(stateWithoutTransaction.hasTransaction(), "State with producerId -1 should not have a transaction"); + } + + @Test + public void testInvalidFormatThrowsException() { + // Test with invalid format - missing epoch + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("123"), + "String with missing epoch should throw IllegalArgumentException"); + + // Test with invalid format - too many parts + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("123:45:67"), + "String with extra parts should throw IllegalArgumentException"); + + // Test with non-numeric producer ID + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("abc:45"), + "Non-numeric producer ID should throw IllegalArgumentException"); + + // Test with non-numeric epoch + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("123:xyz"), + "Non-numeric epoch should throw IllegalArgumentException"); + } + + @Test + public void testInvalidProducerIdEpochCombinations() { + // Valid combinations: both >= 0 + new PreparedTxnState("0:0"); + new PreparedTxnState("123:45"); + + // Invalid: producerId >= 0, epoch < 0 + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("123:-2"), + "Positive producerId with negative epoch (not -1) should throw IllegalArgumentException"); + + // Invalid: producerId < 0 (not -1), epoch >= 0 + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("-2:45"), + "Negative producerId (not -1) with positive epoch should throw IllegalArgumentException"); + + // Invalid: producerId < 0 (not -1), epoch < 0 (not -1) + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("-2:-2"), + "Negative producerId and epoch (not -1) should throw IllegalArgumentException"); + + // Invalid: producerId = -1, epoch >= 0 + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("-1:45"), + "ProducerId -1 with positive epoch should throw IllegalArgumentException"); + + // Invalid: producerId >= 0, epoch = -1 + assertThrows(IllegalArgumentException.class, + () -> new PreparedTxnState("123:-1"), + "Positive producerId with epoch -1 should throw IllegalArgumentException"); + } +}