KAFKA-9776: Downgrade TxnCommit API v3 when broker doesn't support (#8375)

Revert the decision for the sendOffsetsToTransaction(groupMetadata) API to fail with old version of brokers for the sake of making the application easier to adapt between versions. This PR silently downgrade the TxnOffsetCommit API when the build version is small than 3.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Boyang Chen 2020-04-02 21:48:37 -07:00 committed by GitHub
parent 6ddbf4d800
commit 7f640f13b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 590 additions and 587 deletions

View File

@ -510,18 +510,25 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
final boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (userConfiguredTransactions && !userConfiguredIdempotence)
log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (config.idempotenceEnabled()) {
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions);
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
final boolean autoDowngradeTxnCommit = config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT);
transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
apiVersions,
autoDowngradeTxnCommit);
if (transactionManager.isTransactional())
log.info("Instantiated a transactional producer.");
else
@ -642,7 +649,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified
* {@code groupMetadata} should be extracted from the used {@link KafkaConsumer consumer} via
* {@link KafkaConsumer#groupMetadata()} to leverage consumer group metadata for proper fencing.
* {@link KafkaConsumer#groupMetadata()} to leverage consumer group metadata for stronger fencing than
* {@link #sendOffsetsToTransaction(Map, String)} which only sends with consumer group id.
*
* <p>
* Note, that the consumer should have {@code enable.auto.commit=false} and should
* also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).

View File

@ -258,6 +258,21 @@ public class ProducerConfig extends AbstractConfig {
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
/**
* <code>internal.auto.downgrade.txn.commit</code>
* Whether or not the producer should automatically downgrade the transactional commit request when the new group metadata
* feature is not supported by the broker.
* <p>
* The purpose of this flag is to make Kafka Streams being capable of working with old brokers when applying this new API.
* Non Kafka Streams users who are building their own EOS applications should be careful playing around
* with config as there is a risk of violating EOS semantics when turning on this flag.
*
* <p>
* Note: this is an internal configuration and could be changed in the future in a backward incompatible way
*
*/
static final String AUTO_DOWNGRADE_TXN_COMMIT = "internal.auto.downgrade.txn.commit";
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
static {
@ -389,7 +404,11 @@ public class ProducerConfig extends AbstractConfig {
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC);
TRANSACTIONAL_ID_DOC)
.defineInternal(AUTO_DOWNGRADE_TXN_COMMIT,
Type.BOOLEAN,
false,
Importance.LOW);
}
@Override

View File

@ -95,6 +95,7 @@ public class TransactionManager {
private final String transactionalId;
private final int transactionTimeoutMs;
private final ApiVersions apiVersions;
private final boolean autoDowngradeTxnCommit;
private static class TopicPartitionBookkeeper {
@ -283,11 +284,12 @@ public class TransactionManager {
}
}
public TransactionManager(LogContext logContext,
String transactionalId,
int transactionTimeoutMs,
long retryBackoffMs,
ApiVersions apiVersions) {
public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions,
final boolean autoDowngradeTxnCommit) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
@ -304,6 +306,7 @@ public class TransactionManager {
this.retryBackoffMs = retryBackoffMs;
this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
this.apiVersions = apiVersions;
this.autoDowngradeTxnCommit = autoDowngradeTxnCommit;
}
public synchronized TransactionalRequestResult initializeTransactions() {
@ -1152,13 +1155,15 @@ public class TransactionManager {
final TxnOffsetCommitRequest.Builder builder =
new TxnOffsetCommitRequest.Builder(transactionalId,
groupMetadata.groupId(),
producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch,
pendingTxnOffsetCommits,
groupMetadata.memberId(),
groupMetadata.generationId(),
groupMetadata.groupInstanceId());
groupMetadata.groupId(),
producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch,
pendingTxnOffsetCommits,
groupMetadata.memberId(),
groupMetadata.generationId(),
groupMetadata.groupInstanceId(),
autoDowngradeTxnCommit
);
return new TxnOffsetCommitHandler(result, builder);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
@ -27,6 +28,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -39,17 +42,22 @@ import java.util.stream.Collectors;
public class TxnOffsetCommitRequest extends AbstractRequest {
private static final Logger log = LoggerFactory.getLogger(TxnOffsetCommitRequest.class);
public final TxnOffsetCommitRequestData data;
public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
public final TxnOffsetCommitRequestData data;
private final boolean autoDowngrade;
public Builder(final String transactionalId,
final String consumerGroupId,
final long producerId,
final short producerEpoch,
final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits,
final boolean autoDowngrade) {
this(transactionalId,
consumerGroupId,
producerId,
@ -57,7 +65,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
pendingTxnOffsetCommits,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
JoinGroupRequest.UNKNOWN_GENERATION_ID,
Optional.empty());
Optional.empty(),
autoDowngrade);
}
public Builder(final String transactionalId,
@ -67,7 +76,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits,
final String memberId,
final int generationId,
final Optional<String> groupInstanceId) {
final Optional<String> groupInstanceId,
final boolean autoDowngrade) {
super(ApiKeys.TXN_OFFSET_COMMIT);
this.data = new TxnOffsetCommitRequestData()
.setTransactionalId(transactionalId)
@ -78,13 +88,36 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
.setMemberId(memberId)
.setGenerationId(generationId)
.setGroupInstanceId(groupInstanceId.orElse(null));
this.autoDowngrade = autoDowngrade;
}
@Override
public TxnOffsetCommitRequest build(short version) {
if (version < 3 && groupMetadataSet()) {
if (autoDowngrade) {
log.trace("Downgrade the request by resetting group metadata fields: " +
"[member.id:{}, generation.id:{}, group.instance.id:{}], because broker " +
"only supports TxnOffsetCommit version {}. Need " +
"v3 or newer to enable this feature",
data.memberId(), data.generationId(), data.groupInstanceId(), version);
data.setGenerationId(JoinGroupRequest.UNKNOWN_GENERATION_ID)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setGroupInstanceId(null);
} else {
throw new UnsupportedVersionException("Broker unexpectedly " +
"doesn't support group metadata commit API on version " + version);
}
}
return new TxnOffsetCommitRequest(data, version);
}
private boolean groupMetadataSet() {
return !data.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) ||
data.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID ||
data.groupInstanceId() != null;
}
@Override
public String toString() {
return data.toString();

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
@ -39,6 +40,7 @@ import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
@ -83,11 +85,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -138,8 +142,8 @@ public class KafkaProducerTest {
ProducerConfig config = new ProducerConfig(props);
assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertTrue(Arrays.asList("-1", "all").stream().anyMatch(each -> each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG))));
assertTrue(config.getInt(ProducerConfig.RETRIES_CONFIG) == Integer.MAX_VALUE);
assertTrue(Stream.of("-1", "all").anyMatch(each -> each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG))));
assertEquals((int) config.getInt(ProducerConfig.RETRIES_CONFIG), Integer.MAX_VALUE);
assertTrue(config.getString(ProducerConfig.CLIENT_ID_CONFIG).equalsIgnoreCase("producer-" +
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
}
@ -620,7 +624,7 @@ public class KafkaProducerTest {
});
t.start();
exchanger.exchange(null); // 1
assertTrue(producer.partitionsFor(topic) != null);
assertNotNull(producer.partitionsFor(topic));
exchanger.exchange(null); // 2
exchanger.exchange(null); // 3
assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic));
@ -844,10 +848,20 @@ public class KafkaProducerTest {
@Test
public void testSendTxnOffsetsWithGroupMetadata() {
sendOffsetsWithGroupMetadata((short) 3);
}
@Test
public void testSendTxnOffsetsWithGroupMetadataDowngrade() {
sendOffsetsWithGroupMetadata((short) 2);
}
private void sendOffsetsWithGroupMetadata(final short maxVersion) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT, true);
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
@ -855,6 +869,7 @@ public class KafkaProducerTest {
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.TXN_OFFSET_COMMIT.id, (short) 0, maxVersion));
Node node = metadata.fetch().nodes().get(0);
client.throttle(node, 5000);
@ -869,13 +884,19 @@ public class KafkaProducerTest {
String groupInstanceId = "instance";
client.prepareResponse(request -> {
TxnOffsetCommitRequestData data = ((TxnOffsetCommitRequest) request).data;
return data.groupId().equals(groupId) &&
data.memberId().equals(memberId) &&
data.generationId() == generationId &&
data.groupInstanceId().equals(groupInstanceId);
},
txnOffsetsCommitResponse(Collections.singletonMap(
new TopicPartition("topic", 0), Errors.NONE)));
if (maxVersion < 3) {
return data.groupId().equals(groupId) &&
data.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) &&
data.generationId() == JoinGroupRequest.UNKNOWN_GENERATION_ID &&
data.groupInstanceId() == null;
} else {
return data.groupId().equals(groupId) &&
data.memberId().equals(memberId) &&
data.generationId() == generationId &&
data.groupInstanceId().equals(groupInstanceId);
}
}, txnOffsetsCommitResponse(Collections.singletonMap(
new TopicPartition("topic", 0), Errors.NONE)));
client.prepareResponse(endTxnResponse(Errors.NONE));
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
@ -884,6 +905,7 @@ public class KafkaProducerTest {
producer.beginTransaction();
ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId,
generationId, memberId, Optional.of(groupInstanceId));
producer.sendOffsetsToTransaction(Collections.emptyMap(), groupMetadata);
producer.commitTransaction();
}

View File

@ -705,7 +705,7 @@ public class RecordAccumulatorTest {
String metricGrpName = "producer-metrics";
apiVersions.update("foobar", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions());
TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions(), false);
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));

View File

@ -1131,7 +1131,7 @@ public class SenderTest {
public void testUnresolvedSequencesAreNotFatal() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@ -1419,7 +1419,7 @@ public class SenderTest {
@Test
public void testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(transactionManager);
doInitTransactions(transactionManager, new ProducerIdAndEpoch(producerId, (short) 0));
@ -1984,7 +1984,7 @@ public class SenderTest {
public void testTransactionalSplitBatchAndSend() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@ -2293,7 +2293,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
@ -2328,7 +2328,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
@ -2362,7 +2362,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
@ -2394,7 +2394,7 @@ public class SenderTest {
public void testDoNotPollWhenNoRequestSent() {
client = spy(new MockClient(time, metadata));
TransactionManager txnManager = new TransactionManager(logContext, "testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, false);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@ -2406,7 +2406,7 @@ public class SenderTest {
@Test
public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@ -2557,7 +2557,7 @@ public class SenderTest {
}
private TransactionManager createTransactionManager() {
return new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions());
return new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions(), false);
}
private void setupWithTransactionState(TransactionManager transactionManager) {

View File

@ -399,8 +399,10 @@ public class RequestResponseTest {
checkResponse(createWriteTxnMarkersResponse(), 0, true);
checkRequest(createTxnOffsetCommitRequest(0), true);
checkRequest(createTxnOffsetCommitRequest(3), true);
checkRequest(createTxnOffsetCommitRequestWithAutoDowngrade(2), true);
checkErrorResponse(createTxnOffsetCommitRequest(0), new UnknownServerException(), true);
checkErrorResponse(createTxnOffsetCommitRequest(3), new UnknownServerException(), true);
checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), new UnknownServerException(), true);
checkResponse(createTxnOffsetCommitResponse(), 0, true);
checkRequest(createDescribeAclsRequest(), true);
checkErrorResponse(createDescribeAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
@ -872,9 +874,7 @@ public class RequestResponseTest {
setTransactionalId("abracadabra").
setProducerId(123));
final UnsupportedVersionException exception = assertThrows(
UnsupportedVersionException.class, () -> {
bld.build((short) 2).toStruct();
});
UnsupportedVersionException.class, () -> bld.build((short) 2).toStruct());
assertTrue(exception.getMessage().contains("Attempted to write a non-default producerId at version 2"));
bld.build((short) 3);
}
@ -1682,7 +1682,8 @@ public class RequestResponseTest {
"groupId",
21L,
(short) 42,
offsets).build();
offsets,
false).build();
} else {
return new TxnOffsetCommitRequest.Builder("transactionalId",
"groupId",
@ -1691,10 +1692,29 @@ public class RequestResponseTest {
offsets,
"member",
2,
Optional.of("instance")).build();
Optional.of("instance"),
false).build();
}
}
private TxnOffsetCommitRequest createTxnOffsetCommitRequestWithAutoDowngrade(int version) {
final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 73),
new TxnOffsetCommitRequest.CommittedOffset(100, null, Optional.empty()));
offsets.put(new TopicPartition("topic", 74),
new TxnOffsetCommitRequest.CommittedOffset(100, "blah", Optional.of(27)));
return new TxnOffsetCommitRequest.Builder("transactionalId",
"groupId",
21L,
(short) 42,
offsets,
"member",
2,
Optional.of("instance"),
true).build();
}
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.protocol.ApiKeys;
@ -33,6 +34,8 @@ import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
@ -40,36 +43,39 @@ public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
private static int producerId = 10;
private static short producerEpoch = 1;
private static int generationId = 5;
private static Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
private static TxnOffsetCommitRequest.Builder builder;
private static TxnOffsetCommitRequest.Builder builderWithGroupMetadata;
@Before
@Override
public void setUp() {
super.setUp();
}
@Test
@Override
public void testConstructor() {
Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
offsets.clear();
offsets.put(new TopicPartition(topicOne, partitionOne),
new CommittedOffset(
offset,
metadata,
Optional.of((int) leaderEpoch)));
new CommittedOffset(
offset,
metadata,
Optional.of((int) leaderEpoch)));
offsets.put(new TopicPartition(topicTwo, partitionTwo),
new CommittedOffset(
offset,
metadata,
Optional.of((int) leaderEpoch)));
new CommittedOffset(
offset,
metadata,
Optional.of((int) leaderEpoch)));
TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(
builder = new TxnOffsetCommitRequest.Builder(
transactionalId,
groupId,
producerId,
producerEpoch,
offsets);
offsets,
false);
TxnOffsetCommitRequest.Builder builderWithGroupMetadata = new TxnOffsetCommitRequest.Builder(
initializeBuilderWithGroupMetadata(false);
}
private void initializeBuilderWithGroupMetadata(final boolean autoDowngrade) {
builderWithGroupMetadata = new TxnOffsetCommitRequest.Builder(
transactionalId,
groupId,
producerId,
@ -77,7 +83,13 @@ public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
offsets,
memberId,
generationId,
Optional.of(groupInstanceId));
Optional.of(groupInstanceId),
autoDowngrade);
}
@Test
@Override
public void testConstructor() {
Map<TopicPartition, Errors> errorsMap = new HashMap<>();
errorsMap.put(new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR);
@ -122,4 +134,25 @@ public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
}
@Test
public void testEnableGroupMetadataAutoDowngrade() {
for (short version = 0; version <= 2; version++) {
initializeBuilderWithGroupMetadata(true);
final TxnOffsetCommitRequest request = builderWithGroupMetadata.build(version);
assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, request.data.memberId());
assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, request.data.generationId());
assertNull(request.data.groupInstanceId());
}
}
@Test
public void testDisableGroupMetadataAutoDowngrade() {
for (short version = 0; version <= 2; version++) {
initializeBuilderWithGroupMetadata(false);
final short finalVersion = version;
assertThrows(UnsupportedVersionException.class, () -> builderWithGroupMetadata.build(finalVersion));
}
}
}

View File

@ -179,7 +179,8 @@ class KafkaApisTest {
"groupId",
15L,
0.toShort,
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava,
false
).build()
val request = buildRequest(offsetCommitRequest)
@ -215,7 +216,8 @@ class KafkaApisTest {
groupId,
15L,
0.toShort,
Map(topicPartition -> partitionOffsetCommitData).asJava
Map(topicPartition -> partitionOffsetCommitData).asJava,
false
).build(1)
val request = buildRequest(offsetCommitRequest)

View File

@ -60,7 +60,7 @@ class RequestQuotaTest extends BaseRequestTest {
private val smallQuotaProducerClientId = "small-quota-producer-client"
private val smallQuotaConsumerClientId = "small-quota-consumer-client"
private val brokerId: Integer = 0
private var leaderNode: KafkaServer = null
private var leaderNode: KafkaServer = _
// Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low
case class Task(apiKey: ApiKeys, future: Future[_])
@ -82,7 +82,7 @@ class RequestQuotaTest extends BaseRequestTest {
RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
super.setUp()
createTopic(topic, numPartitions, 1)
createTopic(topic, numPartitions)
leaderNode = servers.head
// Change default client-id request quota to a small value and a single unthrottledClient with a large quota
@ -213,7 +213,7 @@ class RequestQuotaTest extends BaseRequestTest {
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
case ApiKeys.FETCH =>
val partitionMap = new LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
partitionMap.put(tp, new FetchRequest.PartitionData(0, 0, 100, Optional.of(15)))
FetchRequest.Builder.forConsumer(0, 0, partitionMap)
@ -358,13 +358,12 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.API_VERSIONS =>
new ApiVersionsRequest.Builder()
case ApiKeys.CREATE_TOPICS => {
case ApiKeys.CREATE_TOPICS =>
new CreateTopicsRequest.Builder(
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("topic-2").setNumPartitions(1).
setReplicationFactor(1.toShort)).iterator())))
}
case ApiKeys.DELETE_TOPICS =>
new DeleteTopicsRequest.Builder(
@ -415,7 +414,8 @@ class RequestQuotaTest extends BaseRequestTest {
"test-txn-group",
2,
0,
Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava)
Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava,
false)
case ApiKeys.DESCRIBE_ACLS =>
new DescribeAclsRequest.Builder(AclBindingFilter.ANY)
@ -581,10 +581,9 @@ class RequestQuotaTest extends BaseRequestTest {
try {
task.future.get(15, TimeUnit.SECONDS)
} catch {
case e: Throwable => {
case e: Throwable =>
error(s"Test failed for api-key ${task.apiKey} with exception $e")
throw e
}
}
}
}

View File

@ -1267,6 +1267,11 @@ public class StreamsConfig extends AbstractConfig {
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);
// When using EOS alpha, stream should auto-downgrade the transactional commit protocol to be compatible with older brokers.
if (StreamThread.eosAlphaEnabled(this)) {
props.put("internal.auto.downgrade.txn.commit", true);
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);

View File

@ -502,6 +502,28 @@ public class StreamsConfigTest {
assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(true));
}
@Test
public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosDisabled() {
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue()));
}
@Test
public void shouldSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosAlpha() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(true));
}
@Test
public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosBeta() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue()));
}
@Test
public void shouldAcceptAtLeastOnce() {
// don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import parametrize, ignore
from ducktape.mark import parametrize
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
@ -29,7 +29,7 @@ class StreamsBrokerCompatibility(Test):
These tests validates that
- Streams works for older brokers 0.11 (or newer)
- Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
- (TODO) Streams w/ EOS-beta works for older brokers 2.5 (or newer)
- Streams w/ EOS-beta works for older brokers 2.5 (or newer)
- Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
- Streams w/ EOS-beta fails fast for older brokers 2.4 or older
"""
@ -85,8 +85,6 @@ class StreamsBrokerCompatibility(Test):
self.consumer.stop()
self.kafka.stop()
# Can be enabled after KAFKA-9776 is fixed
@ignore
@parametrize(broker_version=str(LATEST_2_4))
@parametrize(broker_version=str(LATEST_2_3))
@parametrize(broker_version=str(LATEST_2_2))