KAFKA-9628; Replace Produce request/response with automated protocol (#9401)

This patch rewrites `ProduceRequest` and `ProduceResponse` using the generated protocols. We have also added several new benchmarks to verify no regression in performance. A summary of results is included below:

### Benchmark

1. loop **30** times
1. calculate average

#### kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_throughput

> @cluster(num_nodes=5)
> @parametrize(acks=-1, topic=TOPIC_REP_THREE)

- +0.3144915325 %
- 28.08766667 ->  28.1715625 (mb_per_sec)

> @cluster(num_nodes=5)
> @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[100000],compression_type=["none"], security_protocol=['PLAINTEXT'])

- +4.220730323 %
- 157.145 -> 163.7776667 (mb_per_sec)

> @cluster(num_nodes=7)
> @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)

- +5.996241145%
- 57.64166667 -> 61.098 (mb_per_sec)

> @cluster(num_nodes=5)
> @parametrize(acks=1, topic=TOPIC_REP_THREE)

- +0.3979572536%
- 44.05833333 -> 44.23366667 (mb_per_sec)

> @cluster(num_nodes=5)
> @parametrize(acks=1, topic= TOPIC_REP_ONE)

- +2.228235226%
- 69.23266667 -> 70.77533333 (mb_per_sec)

### JMH results

In short, most ops performance are regression since we have to convert data to protocol data. The cost is inevitable (like other request/response) before we use protocol data directly.

### JMH for ProduceRequest

1. construction regression:
    - 281.474 -> 454.935 ns/op
    - 296.000 -> 1888.000 B/op
1. toErrorResponse regression:
    - 41.942 -> 107.528 ns/op
    - 1216.000 -> 1616.000 B/op
1. toStruct improvement:
    - 255.185 -> 90.728 ns/op
    - 864.000 -> 304.000 B/op

**BEFORE**
```
Benchmark                                                                        Mode  Cnt     Score    Error   Units
ProducerRequestBenchmark.constructorErrorResponse                                avgt   15    41.942 ±  0.036   ns/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate                 avgt   15  6409.263 ±  5.478  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate.norm            avgt   15   296.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space        avgt   15  6416.420 ± 76.071  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space.norm   avgt   15   296.331 ±  3.539    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen           avgt   15     0.002 ±  0.002  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen.norm      avgt   15    ≈ 10⁻⁴             B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.count                      avgt   15   698.000           counts
ProducerRequestBenchmark.constructorErrorResponse:·gc.time                       avgt   15   378.000               ms
ProducerRequestBenchmark.constructorProduceRequest                               avgt   15   281.474 ±  3.286   ns/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate                avgt   15  3923.868 ± 46.303  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate.norm           avgt   15  1216.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space       avgt   15  3923.375 ± 59.568  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space.norm  avgt   15  1215.844 ± 11.184    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen          avgt   15     0.004 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen.norm     avgt   15     0.001 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.count                     avgt   15   515.000           counts
ProducerRequestBenchmark.constructorProduceRequest:·gc.time                      avgt   15   279.000               ms
ProducerRequestBenchmark.constructorStruct                                       avgt   15   255.185 ±  0.069   ns/op
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate                        avgt   15  3074.889 ±  0.823  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate.norm                   avgt   15   864.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space               avgt   15  3077.737 ± 31.537  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm          avgt   15   864.800 ±  8.823    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                  avgt   15     0.003 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm             avgt   15     0.001 ±  0.001    B/op
ProducerRequestBenchmark.constructorStruct:·gc.count                             avgt   15   404.000           counts
ProducerRequestBenchmark.constructorStruct:·gc.time                              avgt   15   214.000               ms
```

**AFTER**
```
Benchmark                                                                        Mode  Cnt     Score    Error   Units
ProducerRequestBenchmark.constructorErrorResponse                                avgt   15   107.528 ±  0.270   ns/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate                 avgt   15  4864.899 ± 12.132  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate.norm            avgt   15   576.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space        avgt   15  4868.023 ± 61.943  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space.norm   avgt   15   576.371 ±  7.331    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen           avgt   15     0.005 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen.norm      avgt   15     0.001 ±  0.001    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.count                      avgt   15   639.000           counts
ProducerRequestBenchmark.constructorErrorResponse:·gc.time                       avgt   15   339.000               ms
ProducerRequestBenchmark.constructorProduceRequest                               avgt   15   454.935 ±  0.332   ns/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate                avgt   15  3769.014 ±  2.767  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate.norm           avgt   15  1888.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space       avgt   15  3763.407 ± 31.530  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space.norm  avgt   15  1885.190 ± 15.594    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen          avgt   15     0.004 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen.norm     avgt   15     0.002 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.count                     avgt   15   494.000           counts
ProducerRequestBenchmark.constructorProduceRequest:·gc.time                      avgt   15   264.000               ms
ProducerRequestBenchmark.constructorStruct                                       avgt   15    90.728 ±  0.695   ns/op
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate                        avgt   15  3043.140 ± 23.246  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate.norm                   avgt   15   304.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space               avgt   15  3047.251 ± 59.638  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm          avgt   15   304.404 ±  5.034    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                  avgt   15     0.003 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm             avgt   15    ≈ 10⁻⁴             B/op
ProducerRequestBenchmark.constructorStruct:·gc.count                             avgt   15   400.000           counts
ProducerRequestBenchmark.constructorStruct:·gc.time                              avgt   15   205.000               ms
```
### JMH for ProduceResponse

1. construction regression:
    - 3.293 -> 303.226 ns/op
    - 24.000 -> 1848.000 B/op
1. toStruct improvement:
    - 825.889 -> 311.725 ns/op
    - 2208.000 -> 896.000 B/op

**BEFORE**

```
Benchmark                                                                          Mode  Cnt     Score    Error   Units
ProducerResponseBenchmark.constructorProduceResponse                               avgt   15     3.293 ±  0.004   ns/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate                avgt   15  6619.731 ±  9.075  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate.norm           avgt   15    24.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space       avgt   15  6618.648 ±  0.153  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space.norm  avgt   15    23.996 ±  0.033    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen          avgt   15     0.003 ±  0.002  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen.norm     avgt   15    ≈ 10⁻⁵             B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.count                     avgt   15   720.000           counts
ProducerResponseBenchmark.constructorProduceResponse:·gc.time                      avgt   15   383.000               ms
ProducerResponseBenchmark.constructorStruct                                        avgt   15   825.889 ±  0.638   ns/op
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate                         avgt   15  2428.000 ±  1.899  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate.norm                    avgt   15  2208.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space                avgt   15  2430.196 ± 55.894  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm           avgt   15  2210.001 ± 51.009    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                   avgt   15     0.003 ±  0.001  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm              avgt   15     0.002 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.count                              avgt   15   319.000           counts
ProducerResponseBenchmark.constructorStruct:·gc.time                               avgt   15   166.000               ms
```

**AFTER**

```
Benchmark                                                                          Mode  Cnt     Score    Error   Units
ProducerResponseBenchmark.constructorProduceResponse                               avgt   15   303.226 ±  0.517   ns/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate                avgt   15  5534.940 ±  9.439  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate.norm           avgt   15  1848.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space       avgt   15  5534.046 ± 51.849  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space.norm  avgt   15  1847.710 ± 18.105    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen          avgt   15     0.007 ±  0.001  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen.norm     avgt   15     0.002 ±  0.001    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.count                     avgt   15   602.000           counts
ProducerResponseBenchmark.constructorProduceResponse:·gc.time                      avgt   15   318.000               ms
ProducerResponseBenchmark.constructorStruct                                        avgt   15   311.725 ±  3.132   ns/op
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate                         avgt   15  2610.602 ± 25.964  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate.norm                    avgt   15   896.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space                avgt   15  2613.021 ± 42.965  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm           avgt   15   896.824 ± 11.331    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                   avgt   15     0.003 ±  0.001  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm              avgt   15     0.001 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.count                              avgt   15   343.000           counts
ProducerResponseBenchmark.constructorStruct:·gc.time                               avgt   15   194.000               ms
```

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Chia-Ping Tsai 2020-11-19 05:44:21 +08:00 committed by GitHub
parent b3264b7996
commit 30bc21ca35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 998 additions and 760 deletions

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@ -59,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
@ -560,13 +562,24 @@ public class Sender implements Runnable {
log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
// if we have a response, parse it
if (response.hasResponse()) {
// Sender should exercise PartitionProduceResponse rather than ProduceResponse.PartitionResponse
// https://issues.apache.org/jira/browse/KAFKA-10696
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
TopicPartition tp = new TopicPartition(r.name(), p.index());
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
p.logAppendTimeMs(),
p.logStartOffset(),
p.recordErrors()
.stream()
.map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage()))
.collect(Collectors.toList()),
p.errorMessage());
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now);
}
}));
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
} else {
// this is the acks = 0 case, just complete all requests
@ -721,7 +734,6 @@ public class Sender implements Runnable {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
@ -730,7 +742,7 @@ public class Sender implements Runnable {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
@ -744,7 +756,14 @@ public class Sender implements Runnable {
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
}
@ -752,8 +771,13 @@ public class Sender implements Runnable {
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
.setTransactionalId(transactionalId)
.setTopicData(tpd));
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
String nodeId = Integer.toString(destination);

View File

@ -111,6 +111,8 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@ -138,8 +140,6 @@ import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -157,7 +157,7 @@ import static org.apache.kafka.common.protocol.types.Type.RECORDS;
* Identifiers for all the Kafka APIs
*/
public enum ApiKeys {
PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()),
PRODUCE(0, "Produce", ProduceRequestData.SCHEMAS, ProduceResponseData.SCHEMAS),
FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS),
LIST_OFFSETS(2, "ListOffsets", ListOffsetRequestData.SCHEMAS, ListOffsetResponseData.SCHEMAS),
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),

View File

@ -32,14 +32,4 @@ public class CommonFields {
"If the epoch provided by the client is larger than the current epoch known to the broker, then " +
"the UNKNOWN_LEADER_EPOCH error code will be returned. If the provided epoch is smaller, then " +
"the FENCED_LEADER_EPOCH error code will be returned.");
// Group APIs
public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier");
// Transactional APIs
public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction.");
public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id",
"The transactional id or null if the producer is not transactional");
public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id.");
public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
@ -146,7 +147,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) {
switch (apiKey) {
case PRODUCE:
return new ProduceRequest(struct, apiVersion);
return new ProduceRequest(new ProduceRequestData(struct, apiVersion), apiVersion);
case FETCH:
return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);
case LIST_OFFSETS:

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
@ -96,7 +97,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) {
switch (apiKey) {
case PRODUCE:
return new ProduceResponse(struct);
return new ProduceResponse(new ProduceResponseData(struct, version));
case FETCH:
return new FetchResponse<>(new FetchResponseData(struct, version));
case LIST_OFFSETS:

View File

@ -19,162 +19,60 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.SendBuilder;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
public class ProduceRequest extends AbstractRequest {
private static final String ACKS_KEY_NAME = "acks";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String TOPIC_DATA_KEY_NAME = "topic_data";
// topic level field names
private static final String PARTITION_DATA_KEY_NAME = "data";
public static Builder forMagic(byte magic, ProduceRequestData data) {
// Message format upgrades correspond with a bump in the produce request version. Older
// message format versions are generally not supported by the produce request versions
// following the bump.
// partition level field names
private static final String RECORD_SET_KEY_NAME = "record_set";
final short minVersion;
final short maxVersion;
if (magic < RecordBatch.MAGIC_VALUE_V2) {
minVersion = 2;
maxVersion = 2;
} else {
minVersion = 3;
maxVersion = ApiKeys.PRODUCE.latestVersion();
}
return new Builder(minVersion, maxVersion, data);
}
private static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(
TOPIC_NAME,
new Field(PARTITION_DATA_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
new Field(RECORD_SET_KEY_NAME, RECORDS)))));
private static final Schema PRODUCE_REQUEST_V0 = new Schema(
new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
"received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for " +
"only the leader and -1 for the full ISR."),
new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."),
new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
/**
* The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0.
* The version number is bumped up to indicate that the client supports quota throttle time field in the response.
*/
private static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
/**
* The body of PRODUCE_REQUEST_V2 is the same as PRODUCE_REQUEST_V1.
* The version number is bumped up to indicate that message format V1 is used which has relative offset and
* timestamp.
*/
private static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
// Produce request V3 adds the transactional id which is used for authorization when attempting to write
// transactional data. This version also adds support for message format V2.
private static final Schema PRODUCE_REQUEST_V3 = new Schema(
CommonFields.NULLABLE_TRANSACTIONAL_ID,
new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
"received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " +
"for only the leader and -1 for the full ISR."),
new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."),
new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
/**
* The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
* The version number is bumped up to indicate that the client supports KafkaStorageException.
* The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3
*/
private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
/**
* The body of the PRODUCE_REQUEST_V5 is the same as PRODUCE_REQUEST_V4.
* The version number is bumped since the PRODUCE_RESPONSE_V5 includes an additional partition level
* field: the log_start_offset.
*/
private static final Schema PRODUCE_REQUEST_V5 = PRODUCE_REQUEST_V4;
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema PRODUCE_REQUEST_V6 = PRODUCE_REQUEST_V5;
/**
* V7 bumped up to indicate ZStandard capability. (see KIP-110)
*/
private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;
/**
* V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
* (See KIP-467)
*/
private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7;
public static Schema[] schemaVersions() {
return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7, PRODUCE_REQUEST_V8};
public static Builder forCurrentMagic(ProduceRequestData data) {
return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data);
}
public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
private final short acks;
private final int timeout;
private final Map<TopicPartition, MemoryRecords> partitionRecords;
private final String transactionalId;
public static Builder forCurrentMagic(short acks,
int timeout,
Map<TopicPartition, MemoryRecords> partitionRecords) {
return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, acks, timeout, partitionRecords, null);
}
public static Builder forMagic(byte magic,
short acks,
int timeout,
Map<TopicPartition, MemoryRecords> partitionRecords,
String transactionalId) {
// Message format upgrades correspond with a bump in the produce request version. Older
// message format versions are generally not supported by the produce request versions
// following the bump.
final short minVersion;
final short maxVersion;
if (magic < RecordBatch.MAGIC_VALUE_V2) {
minVersion = 2;
maxVersion = 2;
} else {
minVersion = 3;
maxVersion = ApiKeys.PRODUCE.latestVersion();
}
return new Builder(minVersion, maxVersion, acks, timeout, partitionRecords, transactionalId);
}
private final ProduceRequestData data;
public Builder(short minVersion,
short maxVersion,
short acks,
int timeout,
Map<TopicPartition, MemoryRecords> partitionRecords,
String transactionalId) {
ProduceRequestData data) {
super(ApiKeys.PRODUCE, minVersion, maxVersion);
this.acks = acks;
this.timeout = timeout;
this.partitionRecords = partitionRecords;
this.transactionalId = transactionalId;
this.data = data;
}
@Override
@ -190,85 +88,82 @@ public class ProduceRequest extends AbstractRequest {
private ProduceRequest build(short version, boolean validate) {
if (validate) {
// Validate the given records first
for (MemoryRecords records : partitionRecords.values()) {
ProduceRequest.validateRecords(version, records);
}
data.topicData().forEach(tpd ->
tpd.partitionData().forEach(partitionProduceData ->
ProduceRequest.validateRecords(version, partitionProduceData.records())));
}
return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId);
return new ProduceRequest(data, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=ProduceRequest")
.append(", acks=").append(acks)
.append(", timeout=").append(timeout)
.append(", partitionRecords=(").append(partitionRecords)
.append("), transactionalId='").append(transactionalId != null ? transactionalId : "")
.append(", acks=").append(data.acks())
.append(", timeout=").append(data.timeoutMs())
.append(", partitionRecords=(").append(data.topicData().stream().flatMap(d -> d.partitionData().stream()).collect(Collectors.toList()))
.append("), transactionalId='").append(data.transactionalId() != null ? data.transactionalId() : "")
.append("'");
return bld.toString();
}
}
/**
* We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate
* the reference to ByteBuffer but those metadata are still useful.
*/
private final short acks;
private final int timeout;
private final String transactionalId;
private final Map<TopicPartition, Integer> partitionSizes;
// This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is
// put in the purgatory (due to client throttling, it can take a while before the response is sent).
// Care should be taken in methods that use this field.
private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
private boolean hasTransactionalRecords = false;
private boolean hasIdempotentRecords = false;
private volatile ProduceRequestData data;
// the partitionSizes is lazily initialized since it is used by server-side in production.
private volatile Map<TopicPartition, Integer> partitionSizes;
private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) {
public ProduceRequest(ProduceRequestData produceRequestData, short version) {
super(ApiKeys.PRODUCE, version);
this.acks = acks;
this.timeout = timeout;
this.transactionalId = transactionalId;
this.partitionRecords = partitionRecords;
this.partitionSizes = createPartitionSizes(partitionRecords);
for (MemoryRecords records : partitionRecords.values()) {
setFlags(records);
}
this.data = produceRequestData;
this.acks = data.acks();
this.timeout = data.timeoutMs();
this.transactionalId = data.transactionalId();
}
private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
Map<TopicPartition, Integer> result = new HashMap<>(partitionRecords.size());
for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
result.put(entry.getKey(), entry.getValue().sizeInBytes());
return result;
@Override
public Send toSend(String destination, RequestHeader header) {
return SendBuilder.buildRequestSend(destination, header, dataOrException());
}
public ProduceRequest(Struct struct, short version) {
super(ApiKeys.PRODUCE, version);
partitionRecords = new HashMap<>();
for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
Struct topicData = (Struct) topicDataObj;
String topic = topicData.get(TOPIC_NAME);
for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.get(PARTITION_ID);
MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME);
setFlags(records);
partitionRecords.put(new TopicPartition(topic, partition), records);
// visible for testing
Map<TopicPartition, Integer> partitionSizes() {
if (partitionSizes == null) {
// this method may be called by different thread (see the comment on data)
synchronized (this) {
if (partitionSizes == null) {
partitionSizes = new HashMap<>();
data.topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData ->
partitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()),
(ignored, previousValue) ->
partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue))
)
);
}
}
}
partitionSizes = createPartitionSizes(partitionRecords);
acks = struct.getShort(ACKS_KEY_NAME);
timeout = struct.getInt(TIMEOUT_KEY_NAME);
transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
return partitionSizes;
}
private void setFlags(MemoryRecords records) {
Iterator<MutableRecordBatch> iterator = records.batches().iterator();
MutableRecordBatch entry = iterator.next();
hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional();
/**
* @return data or IllegalStateException if the data is removed (to prevent unnecessary memory retention).
*/
public ProduceRequestData dataOrException() {
// Store it in a local variable to protect against concurrent updates
ProduceRequestData tmp = data;
if (tmp == null)
throw new IllegalStateException("The partition records are no longer available because clearPartitionRecords() has been invoked.");
return tmp;
}
/**
@ -276,32 +171,7 @@ public class ProduceRequest extends AbstractRequest {
*/
@Override
public Struct toStruct() {
// Store it in a local variable to protect against concurrent updates
Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
short version = version();
Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version));
Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupPartitionDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
struct.set(TIMEOUT_KEY_NAME, timeout);
struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId);
List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {
Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
topicData.set(TOPIC_NAME, topicEntry.getKey());
List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, MemoryRecords> partitionEntry : topicEntry.getValue().entrySet()) {
MemoryRecords records = partitionEntry.getValue();
Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
.set(PARTITION_ID, partitionEntry.getKey())
.set(RECORD_SET_KEY_NAME, records);
partitionArray.add(part);
}
topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
topicDatas.add(topicData);
}
struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
return struct;
return dataOrException().toStruct(version());
}
@Override
@ -312,9 +182,9 @@ public class ProduceRequest extends AbstractRequest {
.append(",timeout=").append(timeout);
if (verbose)
bld.append(",partitionSizes=").append(Utils.mkString(partitionSizes, "[", "]", "=", ","));
bld.append(",partitionSizes=").append(Utils.mkString(partitionSizes(), "[", "]", "=", ","));
else
bld.append(",numPartitions=").append(partitionSizes.size());
bld.append(",numPartitions=").append(partitionSizes().size());
bld.append("}");
return bld.toString();
@ -323,27 +193,31 @@ public class ProduceRequest extends AbstractRequest {
@Override
public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
/* In case the producer doesn't actually want any response */
if (acks == 0)
return null;
if (acks == 0) return null;
Errors error = Errors.forException(e);
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(error);
for (TopicPartition tp : partitions())
responseMap.put(tp, partitionResponse);
return new ProduceResponse(responseMap, throttleTimeMs);
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
partitionSizes().forEach((tp, ignored) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic());
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic());
data.responses().add(tpr);
}
tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
.setIndex(tp.partition())
.setRecordErrors(Collections.emptyList())
.setBaseOffset(INVALID_OFFSET)
.setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
.setLogStartOffset(INVALID_OFFSET)
.setErrorMessage(e.getMessage())
.setErrorCode(error.code()));
});
return new ProduceResponse(data);
}
@Override
public Map<Errors, Integer> errorCounts(Throwable e) {
Errors error = Errors.forException(e);
return Collections.singletonMap(error, partitions().size());
}
private Collection<TopicPartition> partitions() {
return partitionSizes.keySet();
return Collections.singletonMap(error, partitionSizes().size());
}
public short acks() {
@ -358,49 +232,34 @@ public class ProduceRequest extends AbstractRequest {
return transactionalId;
}
public boolean hasTransactionalRecords() {
return hasTransactionalRecords;
}
public boolean hasIdempotentRecords() {
return hasIdempotentRecords;
}
/**
* Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked.
*/
public Map<TopicPartition, MemoryRecords> partitionRecordsOrFail() {
// Store it in a local variable to protect against concurrent updates
Map<TopicPartition, MemoryRecords> partitionRecords = this.partitionRecords;
if (partitionRecords == null)
throw new IllegalStateException("The partition records are no longer available because " +
"clearPartitionRecords() has been invoked.");
return partitionRecords;
}
public void clearPartitionRecords() {
partitionRecords = null;
// lazily initialize partitionSizes.
partitionSizes();
data = null;
}
public static void validateRecords(short version, MemoryRecords records) {
public static void validateRecords(short version, BaseRecords baseRecords) {
if (version >= 3) {
Iterator<MutableRecordBatch> iterator = records.batches().iterator();
if (!iterator.hasNext())
throw new InvalidRecordException("Produce requests with version " + version + " must have at least " +
"one record batch");
if (baseRecords instanceof Records) {
Records records = (Records) baseRecords;
Iterator<? extends RecordBatch> iterator = records.batches().iterator();
if (!iterator.hasNext())
throw new InvalidRecordException("Produce requests with version " + version + " must have at least " +
"one record batch");
MutableRecordBatch entry = iterator.next();
if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain record batches with magic version 2");
if (version < 7 && entry.compressionType() == CompressionType.ZSTD) {
throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " +
"use ZStandard compression");
RecordBatch entry = iterator.next();
if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain record batches with magic version 2");
if (version < 7 && entry.compressionType() == CompressionType.ZSTD) {
throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " +
"use ZStandard compression");
}
if (iterator.hasNext())
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain exactly one record batch");
}
if (iterator.hasNext())
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain exactly one record batch");
}
// Note that we do not do similar validation for older versions to ensure compatibility with
@ -409,7 +268,7 @@ public class ProduceRequest extends AbstractRequest {
}
public static ProduceRequest parse(ByteBuffer buffer, short version) {
return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, buffer), version);
return new ProduceRequest(new ProduceRequestData(new ByteBufferAccessor(buffer), version), version);
}
public static byte requiredMagicForVersion(short produceRequestVersion) {

View File

@ -17,183 +17,57 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.SendBuilder;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* This wrapper supports both v0 and v1 of ProduceResponse.
* This wrapper supports both v0 and v8 of ProduceResponse.
*
* Possible error code:
*
* {@link Errors#CORRUPT_MESSAGE}
* {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
* {@link Errors#NOT_LEADER_OR_FOLLOWER}
* {@link Errors#MESSAGE_TOO_LARGE}
* {@link Errors#INVALID_TOPIC_EXCEPTION}
* {@link Errors#RECORD_LIST_TOO_LARGE}
* {@link Errors#NOT_ENOUGH_REPLICAS}
* {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND}
* {@link Errors#INVALID_REQUIRED_ACKS}
* {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT}
* {@link Errors#INVALID_PRODUCER_EPOCH}
* {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
* {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* {@link Errors#INVALID_RECORD}
*/
public class ProduceResponse extends AbstractResponse {
private static final String RESPONSES_KEY_NAME = "responses";
// topic level field names
private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
public static final long INVALID_OFFSET = -1L;
private final ProduceResponseData data;
/**
* Possible error code:
*
* {@link Errors#CORRUPT_MESSAGE}
* {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
* {@link Errors#NOT_LEADER_OR_FOLLOWER}
* {@link Errors#MESSAGE_TOO_LARGE}
* {@link Errors#INVALID_TOPIC_EXCEPTION}
* {@link Errors#RECORD_LIST_TOO_LARGE}
* {@link Errors#NOT_ENOUGH_REPLICAS}
* {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND}
* {@link Errors#INVALID_REQUIRED_ACKS}
* {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT}
* {@link Errors#INVALID_PRODUCER_EPOCH}
* {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
* {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* {@link Errors#INVALID_RECORD}
*/
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
private static final String RECORD_ERRORS_KEY_NAME = "record_errors";
private static final String BATCH_INDEX_KEY_NAME = "batch_index";
private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message";
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME,
"The start offset of the log at the time this produce response was created", INVALID_OFFSET);
private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME,
"The error message of the record that caused the batch to be dropped");
private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME,
"The global error message summarizing the common root cause of the records that caused the batch to be dropped");
private static final Schema PRODUCE_RESPONSE_V0 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE,
new Field(BASE_OFFSET_KEY_NAME, INT64))))))));
private static final Schema PRODUCE_RESPONSE_V1 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE,
new Field(BASE_OFFSET_KEY_NAME, INT64))))))),
THROTTLE_TIME_MS);
/**
* PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
* The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
* time is used for the topic.
*/
private static final Schema PRODUCE_RESPONSE_V2 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE,
new Field(BASE_OFFSET_KEY_NAME, INT64),
new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
"the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
"If LogAppendTime is used for the topic, the timestamp will be " +
"the broker local time when the messages are appended."))))))),
THROTTLE_TIME_MS);
private static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
/**
* The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
* The version number is bumped up to indicate that the client supports KafkaStorageException.
* The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3
*/
private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
/**
* Add in the log_start_offset field to the partition response to filter out spurious OutOfOrderSequencExceptions
* on the client.
*/
public static final Schema PRODUCE_RESPONSE_V5 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE,
new Field(BASE_OFFSET_KEY_NAME, INT64),
new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
"the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
"If LogAppendTime is used for the topic, the timestamp will be the broker local " +
"time when the messages are appended."),
LOG_START_OFFSET_FIELD)))))),
THROTTLE_TIME_MS);
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema PRODUCE_RESPONSE_V6 = PRODUCE_RESPONSE_V5;
/**
* V7 bumped up to indicate ZStandard capability. (see KIP-110)
*/
private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;
/**
* V8 adds record_errors and error_message. (see KIP-467)
*/
public static final Schema PRODUCE_RESPONSE_V8 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
TOPIC_NAME,
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
PARTITION_ID,
ERROR_CODE,
new Field(BASE_OFFSET_KEY_NAME, INT64),
new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
"the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
"If LogAppendTime is used for the topic, the timestamp will be the broker local " +
"time when the messages are appended."),
LOG_START_OFFSET_FIELD,
new Field(RECORD_ERRORS_KEY_NAME, new ArrayOf(new Schema(
new Field.Int32(BATCH_INDEX_KEY_NAME, "The batch index of the record " +
"that caused the batch to be dropped"),
BATCH_INDEX_ERROR_MESSAGE_FIELD
)), "The batch indices of records that caused the batch to be dropped"),
ERROR_MESSAGE_FIELD)))))),
THROTTLE_TIME_MS);
public static Schema[] schemaVersions() {
return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7, PRODUCE_RESPONSE_V8};
public ProduceResponse(ProduceResponseData produceResponseData) {
this.data = produceResponseData;
}
private final Map<TopicPartition, PartitionResponse> responses;
private final int throttleTimeMs;
/**
* Constructor for Version 0
* @param responses Produced data grouped by topic-partition
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
this(responses, DEFAULT_THROTTLE_TIME);
}
@ -203,119 +77,88 @@ public class ProduceResponse extends AbstractResponse {
* @param responses Produced data grouped by topic-partition
* @param throttleTimeMs Time in milliseconds the response was throttled
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) {
this.responses = responses;
this.throttleTimeMs = throttleTimeMs;
}
/**
* Constructor from a {@link Struct}.
*/
public ProduceResponse(Struct struct) {
responses = new HashMap<>();
for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicRespStruct = (Struct) topicResponse;
String topic = topicRespStruct.get(TOPIC_NAME);
for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
Struct partRespStruct = (Struct) partResponse;
int partition = partRespStruct.get(PARTITION_ID);
Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP;
long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
List<RecordError> recordErrors = Collections.emptyList();
if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
if (recordErrorsArray.length > 0) {
recordErrors = new ArrayList<>(recordErrorsArray.length);
for (Object indexAndMessage : recordErrorsArray) {
Struct indexAndMessageStruct = (Struct) indexAndMessage;
recordErrors.add(new RecordError(
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
));
}
}
}
String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
TopicPartition tp = new TopicPartition(topic, partition);
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage));
}
}
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
this(toData(responses, throttleTimeMs));
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version));
Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupPartitionDataByTopic(responses);
List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
Struct topicData = struct.instance(RESPONSES_KEY_NAME);
topicData.set(TOPIC_NAME, entry.getKey());
List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
PartitionResponse part = partitionEntry.getValue();
short errorCode = part.error.code();
// If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code
// for KafkaStorageException. In this case the client library will translate KafkaStorageException to
// UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry
// by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if ProduceRequest version <= 3
if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3)
errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
.set(PARTITION_ID, partitionEntry.getKey())
.set(ERROR_CODE, errorCode)
.set(BASE_OFFSET_KEY_NAME, part.baseOffset);
partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);
if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
List<Struct> recordErrors = Collections.emptyList();
if (!part.recordErrors.isEmpty()) {
recordErrors = new ArrayList<>();
for (RecordError indexAndMessage : part.recordErrors) {
Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME)
.set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex)
.set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message);
recordErrors.add(indexAndMessageStruct);
}
}
partStruct.set(RECORD_ERRORS_KEY_NAME, recordErrors.toArray());
}
partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
partitionArray.add(partStruct);
}
topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
topicDatas.add(topicData);
}
struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
return struct;
protected Send toSend(String destination, ResponseHeader header, short apiVersion) {
return SendBuilder.buildResponseSend(destination, header, this.data, apiVersion);
}
private static ProduceResponseData toData(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) {
ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
responses.forEach((tp, response) -> {
ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic());
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic());
data.responses().add(tpr);
}
tpr.partitionResponses()
.add(new ProduceResponseData.PartitionProduceResponse()
.setIndex(tp.partition())
.setBaseOffset(response.baseOffset)
.setLogStartOffset(response.logStartOffset)
.setLogAppendTimeMs(response.logAppendTime)
.setErrorMessage(response.errorMessage)
.setErrorCode(response.error.code())
.setRecordErrors(response.recordErrors
.stream()
.map(e -> new ProduceResponseData.BatchIndexAndErrorMessage()
.setBatchIndex(e.batchIndex)
.setBatchIndexErrorMessage(e.message))
.collect(Collectors.toList())));
});
return data;
}
/**
* Visible for testing.
*/
@Override
public Struct toStruct(short version) {
return data.toStruct(version);
}
public ProduceResponseData data() {
return this.data;
}
/**
* this method is used by testing only.
* refactor the tests which are using this method and then remove this method from production code.
* https://issues.apache.org/jira/browse/KAFKA-10697
*/
@Deprecated
public Map<TopicPartition, PartitionResponse> responses() {
return this.responses;
return data.responses()
.stream()
.flatMap(t -> t.partitionResponses()
.stream()
.map(p -> new AbstractMap.SimpleEntry<>(new TopicPartition(t.name(), p.index()),
new PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
p.logAppendTimeMs(),
p.logStartOffset(),
p.recordErrors()
.stream()
.map(e -> new RecordError(e.batchIndex(), e.batchIndexErrorMessage()))
.collect(Collectors.toList()),
p.errorMessage()))))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
}
@Override
public int throttleTimeMs() {
return this.throttleTimeMs;
return this.data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
responses.values().forEach(response ->
updateErrorCounts(errorCounts, response.error)
);
data.responses().forEach(t -> t.partitionResponses().forEach(p -> updateErrorCounts(errorCounts, Errors.forCode(p.errorCode()))));
return errorCounts;
}
@ -348,6 +191,24 @@ public class ProduceResponse extends AbstractResponse {
this.errorMessage = errorMessage;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionResponse that = (PartitionResponse) o;
return baseOffset == that.baseOffset &&
logAppendTime == that.logAppendTime &&
logStartOffset == that.logStartOffset &&
error == that.error &&
Objects.equals(recordErrors, that.recordErrors) &&
Objects.equals(errorMessage, that.errorMessage);
}
@Override
public int hashCode() {
return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage);
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();
@ -387,10 +248,23 @@ public class ProduceResponse extends AbstractResponse {
this.message = null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RecordError that = (RecordError) o;
return batchIndex == that.batchIndex &&
Objects.equals(message, that.message);
}
@Override
public int hashCode() {
return Objects.hash(batchIndex, message);
}
}
public static ProduceResponse parse(ByteBuffer buffer, short version) {
return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer));
return new ProduceResponse(new ProduceResponseData(new ByteBufferAccessor(buffer), version));
}
@Override

View File

@ -16,11 +16,16 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Optional;
public final class RequestUtils {
@ -48,4 +53,41 @@ public final class RequestUtils {
buffer.rewind();
return buffer;
}
// visible for testing
public static boolean hasIdempotentRecords(ProduceRequest request) {
return flags(request).getKey();
}
// visible for testing
public static boolean hasTransactionalRecords(ProduceRequest request) {
return flags(request).getValue();
}
/**
* Get both hasIdempotentRecords flag and hasTransactionalRecords flag from produce request.
* Noted that we find all flags at once to avoid duplicate loop and record batch construction.
* @return first flag is "hasIdempotentRecords" and another is "hasTransactionalRecords"
*/
public static AbstractMap.SimpleEntry<Boolean, Boolean> flags(ProduceRequest request) {
boolean hasIdempotentRecords = false;
boolean hasTransactionalRecords = false;
for (ProduceRequestData.TopicProduceData tpd : request.dataOrException().topicData()) {
for (ProduceRequestData.PartitionProduceData ppd : tpd.partitionData()) {
BaseRecords records = ppd.records();
if (records instanceof Records) {
Iterator<? extends RecordBatch> iterator = ((Records) records).batches().iterator();
if (iterator.hasNext()) {
RecordBatch batch = iterator.next();
hasIdempotentRecords = hasIdempotentRecords || batch.hasProducerId();
hasTransactionalRecords = hasTransactionalRecords || batch.isTransactional();
}
}
// return early
if (hasIdempotentRecords && hasTransactionalRecords)
return new AbstractMap.SimpleEntry<>(true, true);
}
}
return new AbstractMap.SimpleEntry<>(hasIdempotentRecords, hasTransactionalRecords);
}
}

View File

@ -33,21 +33,21 @@
"validVersions": "0-8",
"flexibleVersions": "none",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId",
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
"about": "The transactional ID, or null if the producer is not transactional." },
{ "name": "Acks", "type": "int16", "versions": "0+",
"about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." },
{ "name": "TimeoutMs", "type": "int32", "versions": "0+",
"about": "The timeout to await a response in miliseconds." },
{ "name": "Topics", "type": "[]TopicProduceData", "versions": "0+",
{ "name": "TopicData", "type": "[]TopicProduceData", "versions": "0+",
"about": "Each topic to produce to.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionProduceData", "versions": "0+",
{ "name": "PartitionData", "type": "[]PartitionProduceData", "versions": "0+",
"about": "Each partition to produce to.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
{ "name": "Index", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+",
"about": "The record data to be produced." }
]}
]}

View File

@ -35,11 +35,11 @@
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
"about": "Each produce response", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name" },
{ "name": "Partitions", "type": "[]PartitionProduceResponse", "versions": "0+",
{ "name": "PartitionResponses", "type": "[]PartitionProduceResponse", "versions": "0+",
"about": "Each partition that we produced to within the topic.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
{ "name": "Index", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
@ -60,7 +60,7 @@
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}
]}
]},
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
]
}

View File

@ -25,9 +25,10 @@ import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ApiVersionsResponse;
@ -154,8 +155,10 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.emptyMap());
ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks((short) 1)
.setTimeoutMs(1000));
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
client.send(request, time.milliseconds());
assertEquals("There should be 1 in-flight request after send", 1,
@ -185,10 +188,9 @@ public class NetworkClientTest {
ProduceRequest.Builder builder = new ProduceRequest.Builder(
PRODUCE.latestVersion(),
PRODUCE.latestVersion(),
(short) 1,
1000,
Collections.emptyMap(),
null);
new ProduceRequestData()
.setAcks((short) 1)
.setTimeoutMs(1000));
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = networkClient.newClientRequest(node.idString(), builder, time.milliseconds(),
true, defaultRequestTimeoutMs, handler);
@ -198,8 +200,9 @@ public class NetworkClientTest {
ResponseHeader respHeader =
new ResponseHeader(request.correlationId(),
request.apiKey().responseHeaderVersion(PRODUCE.latestVersion()));
Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion()));
resp.set("responses", new Object[0]);
Struct resp = new ProduceResponseData()
.setThrottleTimeMs(100)
.toStruct(PRODUCE.latestVersion());
Struct responseHeaderStruct = respHeader.toStruct();
int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
ByteBuffer buffer = ByteBuffer.allocate(size);
@ -431,8 +434,10 @@ public class NetworkClientTest {
@Test
public void testRequestTimeout() {
awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
1000, Collections.emptyMap());
ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks((short) 1)
.setTimeoutMs(1000));
TestCallbackHandler handler = new TestCallbackHandler();
int requestTimeoutMs = defaultRequestTimeoutMs + 5000;
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
@ -444,8 +449,10 @@ public class NetworkClientTest {
@Test
public void testDefaultRequestTimeout() {
awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
1000, Collections.emptyMap());
ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks((short) 1)
.setTimeoutMs(1000));
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
assertEquals(defaultRequestTimeoutMs, request.requestTimeoutMs());
testRequestTimeout(request);
@ -498,10 +505,9 @@ public class NetworkClientTest {
ProduceRequest.Builder builder = new ProduceRequest.Builder(
PRODUCE.latestVersion(),
PRODUCE.latestVersion(),
(short) 1,
1000,
Collections.emptyMap(),
null);
new ProduceRequestData()
.setAcks((short) 1)
.setTimeoutMs(1000));
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
defaultRequestTimeoutMs, handler);
@ -510,9 +516,9 @@ public class NetworkClientTest {
ResponseHeader respHeader =
new ResponseHeader(request.correlationId(),
request.apiKey().responseHeaderVersion(PRODUCE.latestVersion()));
Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion()));
resp.set("responses", new Object[0]);
resp.set(CommonFields.THROTTLE_TIME_MS, 100);
Struct resp = new ProduceResponseData()
.setThrottleTimeMs(100)
.toStruct(PRODUCE.latestVersion());
Struct responseHeaderStruct = respHeader.toStruct();
int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
ByteBuffer buffer = ByteBuffer.allocate(size);
@ -586,8 +592,10 @@ public class NetworkClientTest {
}
private int sendEmptyProduceRequest(String nodeId) {
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.emptyMap());
ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks((short) 1)
.setTimeoutMs(1000));
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true,
defaultRequestTimeoutMs, handler);
@ -606,9 +614,9 @@ public class NetworkClientTest {
}
private void sendThrottledProduceResponse(int correlationId, int throttleMs) {
Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion()));
resp.set("responses", new Object[0]);
resp.set(CommonFields.THROTTLE_TIME_MS, throttleMs);
Struct resp = new ProduceResponseData()
.setThrottleTimeMs(throttleMs)
.toStruct(PRODUCE.latestVersion());
sendResponse(new ResponseHeader(correlationId,
PRODUCE.responseHeaderVersion(PRODUCE.latestVersion())),
resp);

View File

@ -26,8 +26,10 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
@ -156,6 +158,15 @@ public class SenderTest {
this.metrics.close();
}
private static Map<TopicPartition, MemoryRecords> partitionRecords(ProduceRequest request) {
Map<TopicPartition, MemoryRecords> partitionRecords = new HashMap<>();
request.dataOrException().topicData().forEach(tpData -> tpData.partitionData().forEach(p -> {
TopicPartition tp = new TopicPartition(tpData.name(), p.index());
partitionRecords.put(tp, (MemoryRecords) p.records());
}));
return Collections.unmodifiableMap(partitionRecords);
}
@Test
public void testSimple() throws Exception {
long offset = 0;
@ -195,7 +206,7 @@ public class SenderTest {
if (request.version() != 2)
return false;
MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
MemoryRecords records = partitionRecords(request).get(tp0);
return records != null &&
records.sizeInBytes() > 0 &&
records.hasMatchingMagic(RecordBatch.MAGIC_VALUE_V1);
@ -208,6 +219,7 @@ public class SenderTest {
assertEquals(offset, future.get().offset());
}
@SuppressWarnings("deprecation")
@Test
public void testDownConversionForMismatchedMagicValues() throws Exception {
// it can happen that we construct a record set with mismatching magic values (perhaps
@ -241,7 +253,7 @@ public class SenderTest {
if (request.version() != 2)
return false;
Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
Map<TopicPartition, MemoryRecords> recordsMap = partitionRecords(request);
if (recordsMap.size() != 2)
return false;
@ -262,6 +274,7 @@ public class SenderTest {
/*
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@SuppressWarnings("deprecation")
@Test
public void testQuotaMetrics() {
MockSelector selector = new MockSelector(time);
@ -284,8 +297,10 @@ public class SenderTest {
for (int i = 1; i <= 3; i++) {
int throttleTimeMs = 100 * i;
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.emptyMap());
ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks((short) 1)
.setTimeoutMs(1000));
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
@ -648,7 +663,7 @@ public class SenderTest {
client.respond(body -> {
ProduceRequest request = (ProduceRequest) body;
assertFalse(request.hasIdempotentRecords());
assertFalse(RequestUtils.hasIdempotentRecords(request));
return true;
}, produceResponse(tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
sender.runOnce();
@ -1806,9 +1821,9 @@ public class SenderTest {
void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) {
client.respond(body -> {
ProduceRequest produceRequest = (ProduceRequest) body;
assertTrue(produceRequest.hasIdempotentRecords());
assertTrue(RequestUtils.hasIdempotentRecords(produceRequest));
MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0);
MemoryRecords records = partitionRecords(produceRequest).get(tp0);
Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
RecordBatch firstBatch = batchIterator.next();
assertFalse(batchIterator.hasNext());
@ -1829,8 +1844,7 @@ public class SenderTest {
// cluster authorization is a fatal error for the producer
Future<RecordMetadata> future = appendToAccumulator(tp0);
client.prepareResponse(
body -> body instanceof ProduceRequest &&
((ProduceRequest) body).hasIdempotentRecords(),
body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body),
produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
sender.runOnce();
@ -1858,8 +1872,7 @@ public class SenderTest {
sender.runOnce();
client.respond(
body -> body instanceof ProduceRequest &&
((ProduceRequest) body).hasIdempotentRecords(),
body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body),
produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
sender.runOnce();
@ -1871,8 +1884,7 @@ public class SenderTest {
// Should be fine if the second response eventually returns
client.respond(
body -> body instanceof ProduceRequest &&
((ProduceRequest) body).hasIdempotentRecords(),
body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body),
produceResponse(tp1, 0, Errors.NONE, 0));
sender.runOnce();
}
@ -1888,8 +1900,7 @@ public class SenderTest {
Future<RecordMetadata> future = appendToAccumulator(tp0);
client.prepareResponse(
body -> body instanceof ProduceRequest &&
((ProduceRequest) body).hasIdempotentRecords(),
body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body),
produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
sender.runOnce();
@ -1910,8 +1921,7 @@ public class SenderTest {
Future<RecordMetadata> future = appendToAccumulator(tp0);
client.prepareUnsupportedVersionResponse(
body -> body instanceof ProduceRequest &&
((ProduceRequest) body).hasIdempotentRecords());
body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body));
sender.runOnce();
assertFutureFailure(future, UnsupportedVersionException.class);
@ -1940,7 +1950,7 @@ public class SenderTest {
client.prepareResponse(body -> {
if (body instanceof ProduceRequest) {
ProduceRequest request = (ProduceRequest) body;
MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
MemoryRecords records = partitionRecords(request).get(tp0);
Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
assertTrue(batchIterator.hasNext());
RecordBatch batch = batchIterator.next();
@ -2057,6 +2067,7 @@ public class SenderTest {
testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
}
@SuppressWarnings("deprecation")
private void testSplitBatchAndSend(TransactionManager txnManager,
ProducerIdAndEpoch producerIdAndEpoch,
TopicPartition tp) throws Exception {
@ -2170,6 +2181,7 @@ public class SenderTest {
assertEquals(0, sender.inFlightBatches(tp0).size());
}
@SuppressWarnings("deprecation")
@Test
public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException {
long deliveryTimeoutMs = 1500L;
@ -2304,6 +2316,7 @@ public class SenderTest {
}
@SuppressWarnings("deprecation")
@Test
public void testExpiredBatchesInMultiplePartitions() throws Exception {
long deliveryTimeoutMs = 1500L;
@ -2601,7 +2614,7 @@ public class SenderTest {
return false;
ProduceRequest request = (ProduceRequest) body;
Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
Map<TopicPartition, MemoryRecords> recordsMap = partitionRecords(request);
MemoryRecords records = recordsMap.get(tp);
if (records == null)
return false;
@ -2637,6 +2650,7 @@ public class SenderTest {
null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
}
@SuppressWarnings("deprecation")
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset,
RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage);
@ -2644,6 +2658,7 @@ public class SenderTest {
return new ProduceResponse(partResp, throttleTimeMs);
}
@SuppressWarnings("deprecation")
private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> responses) {
Map<TopicPartition, ProduceResponse.PartitionResponse> partResponses = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, OffsetAndError> entry : responses.entrySet()) {

View File

@ -3356,7 +3356,16 @@ public class TransactionManagerTest {
private MockClient.RequestMatcher produceRequestMatcher(final long producerId, final short epoch, TopicPartition tp) {
return body -> {
ProduceRequest produceRequest = (ProduceRequest) body;
MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp);
MemoryRecords records = produceRequest.dataOrException().topicData()
.stream()
.filter(t -> t.name().equals(tp.topic()))
.findAny()
.get()
.partitionData()
.stream()
.filter(p -> p.index() == tp.partition())
.map(p -> (MemoryRecords) p.records())
.findAny().get();
assertNotNull(records);
Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
assertTrue(batchIterator.hasNext());
@ -3481,6 +3490,7 @@ public class TransactionManagerTest {
return produceResponse(tp, offset, error, throttleTimeMs, 10);
}
@SuppressWarnings("deprecation")
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, int logStartOffset) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp);

View File

@ -678,48 +678,47 @@ public final class MessageTest {
String errorMessage = "global error message";
testAllMessageRoundTrips(new ProduceResponseData()
.setResponses(singletonList(
new ProduceResponseData.TopicProduceResponse()
.setName(topicName)
.setPartitions(singletonList(
new ProduceResponseData.PartitionProduceResponse()
.setPartitionIndex(partitionIndex)
.setErrorCode(errorCode)
.setBaseOffset(baseOffset))))));
.setResponses(new ProduceResponseData.TopicProduceResponseCollection(singletonList(
new ProduceResponseData.TopicProduceResponse()
.setName(topicName)
.setPartitionResponses(singletonList(
new ProduceResponseData.PartitionProduceResponse()
.setIndex(partitionIndex)
.setErrorCode(errorCode)
.setBaseOffset(baseOffset)))).iterator())));
Supplier<ProduceResponseData> response =
() -> new ProduceResponseData()
.setResponses(singletonList(
new ProduceResponseData.TopicProduceResponse()
.setName(topicName)
.setPartitions(singletonList(
new ProduceResponseData.PartitionProduceResponse()
.setPartitionIndex(partitionIndex)
.setErrorCode(errorCode)
.setBaseOffset(baseOffset)
.setLogAppendTimeMs(logAppendTimeMs)
.setLogStartOffset(logStartOffset)
.setRecordErrors(singletonList(
new ProduceResponseData.BatchIndexAndErrorMessage()
.setBatchIndex(batchIndex)
.setBatchIndexErrorMessage(batchIndexErrorMessage)))
.setErrorMessage(errorMessage)))))
.setThrottleTimeMs(throttleTimeMs);
Supplier<ProduceResponseData> response = () -> new ProduceResponseData()
.setResponses(new ProduceResponseData.TopicProduceResponseCollection(singletonList(
new ProduceResponseData.TopicProduceResponse()
.setName(topicName)
.setPartitionResponses(singletonList(
new ProduceResponseData.PartitionProduceResponse()
.setIndex(partitionIndex)
.setErrorCode(errorCode)
.setBaseOffset(baseOffset)
.setLogAppendTimeMs(logAppendTimeMs)
.setLogStartOffset(logStartOffset)
.setRecordErrors(singletonList(
new ProduceResponseData.BatchIndexAndErrorMessage()
.setBatchIndex(batchIndex)
.setBatchIndexErrorMessage(batchIndexErrorMessage)))
.setErrorMessage(errorMessage)))).iterator()))
.setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) {
ProduceResponseData responseData = response.get();
if (version < 8) {
responseData.responses().get(0).partitions().get(0).setRecordErrors(Collections.emptyList());
responseData.responses().get(0).partitions().get(0).setErrorMessage(null);
responseData.responses().iterator().next().partitionResponses().get(0).setRecordErrors(Collections.emptyList());
responseData.responses().iterator().next().partitionResponses().get(0).setErrorMessage(null);
}
if (version < 5) {
responseData.responses().get(0).partitions().get(0).setLogStartOffset(-1);
responseData.responses().iterator().next().partitionResponses().get(0).setLogStartOffset(-1);
}
if (version < 2) {
responseData.responses().get(0).partitions().get(0).setLogAppendTimeMs(-1);
responseData.responses().iterator().next().partitionResponses().get(0).setLogAppendTimeMs(-1);
}
if (version < 1) {

View File

@ -18,7 +18,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
@ -30,10 +30,8 @@ import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -50,30 +48,47 @@ public class ProduceRequestTest {
public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception {
final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0, CompressionType.NONE, 1L,
(short) 1, 1, 1, simpleRecord);
final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1,
10, Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.hasTransactionalRecords());
final ProduceRequest request = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("topic")
.setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(1)
.setRecords(memoryRecords)))).iterator()))
.setAcks((short) -1)
.setTimeoutMs(10)).build();
assertTrue(RequestUtils.hasTransactionalRecords(request));
}
@Test
public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() throws Exception {
final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
assertFalse(request.hasTransactionalRecords());
assertFalse(RequestUtils.hasTransactionalRecords(request));
}
@Test
public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() throws Exception {
final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
assertFalse(request.hasTransactionalRecords());
assertFalse(RequestUtils.hasTransactionalRecords(request));
}
@Test
public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception {
final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1, CompressionType.NONE, 1L,
(short) 1, 1, 1, simpleRecord);
final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1, 10,
Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.hasIdempotentRecords());
final ProduceRequest request = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("topic")
.setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(1)
.setRecords(memoryRecords)))).iterator()))
.setAcks((short) -1)
.setTimeoutMs(10)).build();
assertTrue(RequestUtils.hasIdempotentRecords(request));
}
@Test
@ -82,11 +97,14 @@ public class ProduceRequestTest {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), builder.build());
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.MAGIC_VALUE_V1, (short) 1,
5000, produceData, null);
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(RecordBatch.MAGIC_VALUE_V1,
new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData().setName("test").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
.iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000));
assertEquals(2, requestBuilder.oldestAllowedVersion());
assertEquals(2, requestBuilder.latestAllowedVersion());
}
@ -97,11 +115,14 @@ public class ProduceRequestTest {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), builder.build());
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
(short) 1, 5000, produceData, null);
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData().setName("test").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
.iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000));
assertEquals(3, requestBuilder.oldestAllowedVersion());
assertEquals(ApiKeys.PRODUCE.latestVersion(), requestBuilder.latestAllowedVersion());
}
@ -120,17 +141,31 @@ public class ProduceRequestTest {
buffer.flip();
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("test")
.setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(MemoryRecords.readableRecords(buffer))))).iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
}
@Test
public void testV3AndAboveCannotHaveNoRecordBatches() {
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("test")
.setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(MemoryRecords.EMPTY)))).iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
}
@ -141,9 +176,16 @@ public class ProduceRequestTest {
TimestampType.NO_TIMESTAMP_TYPE, 0L);
builder.append(10L, null, "a".getBytes());
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), builder.build());
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("test")
.setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(builder.build())))).iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
}
@ -154,9 +196,16 @@ public class ProduceRequestTest {
TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), builder.build());
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("test")
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(builder.build()))))
.iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
}
@ -167,17 +216,25 @@ public class ProduceRequestTest {
TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), builder.build());
ProduceRequestData produceData = new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("test")
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(builder.build()))))
.iterator()))
.setAcks((short) 1)
.setTimeoutMs(1000);
// Can't create ProduceRequest instance with version within [3, 7)
for (short version = 3; version < 7; version++) {
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, (short) 1, 5000, produceData, null);
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData);
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
}
// Works fine with current version (>= 7)
ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
ProduceRequest.forCurrentMagic(produceData);
}
@Test
@ -192,16 +249,19 @@ public class ProduceRequestTest {
final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
final Map<TopicPartition, MemoryRecords> recordsByPartition = new LinkedHashMap<>();
recordsByPartition.put(new TopicPartition("foo", 0), txnRecords);
recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords);
final ProduceRequest.Builder builder = ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000,
recordsByPartition, transactionalId);
ProduceRequest.Builder builder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))),
new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords))))
.iterator()))
.setAcks((short) -1)
.setTimeoutMs(5000));
final ProduceRequest request = builder.build();
assertTrue(request.hasTransactionalRecords());
assertTrue(request.hasIdempotentRecords());
assertTrue(RequestUtils.hasTransactionalRecords(request));
assertTrue(RequestUtils.hasIdempotentRecords(request));
}
@Test
@ -215,16 +275,20 @@ public class ProduceRequestTest {
final MemoryRecords txnRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId,
producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
final Map<TopicPartition, MemoryRecords> recordsByPartition = new LinkedHashMap<>();
recordsByPartition.put(new TopicPartition("foo", 0), txnRecords);
recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords);
final ProduceRequest.Builder builder = ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000,
recordsByPartition, null);
ProduceRequest.Builder builder = ProduceRequest.forMagic(RecordVersion.current().value,
new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))),
new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords))))
.iterator()))
.setAcks((short) -1)
.setTimeoutMs(5000));
final ProduceRequest request = builder.build();
assertFalse(request.hasTransactionalRecords());
assertTrue(request.hasIdempotentRecords());
assertFalse(RequestUtils.hasTransactionalRecords(request));
assertTrue(RequestUtils.hasIdempotentRecords(request));
}
private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) {
@ -244,8 +308,15 @@ public class ProduceRequestTest {
}
private ProduceRequest createNonIdempotentNonTransactionalRecords() {
final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, simpleRecord);
return ProduceRequest.Builder.forCurrentMagic((short) -1, 10,
Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
return ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("topic")
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(1)
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, simpleRecord)))))
.iterator()))
.setAcks((short) -1)
.setTimeoutMs(10)).build();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@ -31,10 +32,12 @@ import java.util.Map;
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ProduceResponseTest {
@SuppressWarnings("deprecation")
@Test
public void produceResponseV5Test() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
@ -64,6 +67,7 @@ public class ProduceResponseTest {
assertEquals(responseData, v5Response.responses());
}
@SuppressWarnings("deprecation")
@Test
public void produceResponseVersionTest() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
@ -86,6 +90,7 @@ public class ProduceResponseTest {
assertEquals("Response data does not match", responseData, v2Response.responses());
}
@SuppressWarnings("deprecation")
@Test
public void produceResponseRecordErrorsTest() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
@ -100,7 +105,7 @@ public class ProduceResponseTest {
ProduceResponse response = new ProduceResponse(responseData);
Struct struct = response.toStruct(ver);
assertEquals("Should use schema version " + ver, ApiKeys.PRODUCE.responseSchema(ver), struct.schema());
ProduceResponse.PartitionResponse deserialized = new ProduceResponse(struct).responses().get(tp);
ProduceResponse.PartitionResponse deserialized = new ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp);
if (ver >= 8) {
assertEquals(1, deserialized.recordErrors.size());
assertEquals(3, deserialized.recordErrors.get(0).batchIndex);
@ -108,7 +113,7 @@ public class ProduceResponseTest {
assertEquals("Produce failed", deserialized.errorMessage);
} else {
assertEquals(0, deserialized.recordErrors.size());
assertEquals(null, deserialized.errorMessage);
assertNull(deserialized.errorMessage);
}
}
}

View File

@ -131,6 +131,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@ -625,10 +626,35 @@ public class RequestResponseTest {
builder.build((short) 0);
}
@Test
public void testPartitionSize() {
TopicPartition tp0 = new TopicPartition("test", 0);
TopicPartition tp1 = new TopicPartition("test", 1);
MemoryRecords records0 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2,
CompressionType.NONE, new SimpleRecord("woot".getBytes()));
MemoryRecords records1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2,
CompressionType.NONE, new SimpleRecord("woot".getBytes()), new SimpleRecord("woot".getBytes()));
ProduceRequest request = ProduceRequest.forMagic(RecordBatch.MAGIC_VALUE_V2,
new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
new ProduceRequestData.TopicProduceData().setName(tp0.topic()).setPartitionData(
Collections.singletonList(new ProduceRequestData.PartitionProduceData().setIndex(tp0.partition()).setRecords(records0))),
new ProduceRequestData.TopicProduceData().setName(tp1.topic()).setPartitionData(
Collections.singletonList(new ProduceRequestData.PartitionProduceData().setIndex(tp1.partition()).setRecords(records1))))
.iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000)
.setTransactionalId("transactionalId"))
.build((short) 3);
assertEquals(2, request.partitionSizes().size());
assertEquals(records0.sizeInBytes(), (int) request.partitionSizes().get(tp0));
assertEquals(records1.sizeInBytes(), (int) request.partitionSizes().get(tp1));
}
@Test
public void produceRequestToStringTest() {
ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
assertEquals(1, request.partitionRecordsOrFail().size());
assertEquals(1, request.dataOrException().topicData().size());
assertFalse(request.toString(false).contains("partitionSizes"));
assertTrue(request.toString(false).contains("numPartitions=1"));
assertTrue(request.toString(true).contains("partitionSizes"));
@ -636,8 +662,8 @@ public class RequestResponseTest {
request.clearPartitionRecords();
try {
request.partitionRecordsOrFail();
fail("partitionRecordsOrFail should fail after clearPartitionRecords()");
request.dataOrException();
fail("dataOrException should fail after clearPartitionRecords()");
} catch (IllegalStateException e) {
// OK
}
@ -649,10 +675,11 @@ public class RequestResponseTest {
assertFalse(request.toString(true).contains("numPartitions"));
}
@SuppressWarnings("deprecation")
@Test
public void produceRequestGetErrorResponseTest() {
ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
Set<TopicPartition> partitions = new HashSet<>(request.partitionRecordsOrFail().keySet());
Set<TopicPartition> partitions = new HashSet<>(request.partitionSizes().keySet());
ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
assertEquals(partitions, errorResponse.responses().keySet());
@ -1389,17 +1416,27 @@ public class RequestResponseTest {
return new OffsetFetchResponse(Errors.NONE, responseData);
}
@SuppressWarnings("deprecation")
private ProduceRequest createProduceRequest(int version) {
if (version < 2)
throw new IllegalArgumentException("Produce request version 2 is not supported");
byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2;
MemoryRecords records = MemoryRecords.withRecords(magic, CompressionType.NONE, new SimpleRecord("woot".getBytes()));
Map<TopicPartition, MemoryRecords> produceData = Collections.singletonMap(new TopicPartition("test", 0), records);
return ProduceRequest.Builder.forMagic(magic, (short) 1, 5000, produceData, "transactionalId")
return ProduceRequest.forMagic(magic,
new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("test")
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(records)))).iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000)
.setTransactionalId(version >= 3 ? "transactionalId" : null))
.build((short) version);
}
@SuppressWarnings("deprecation")
private ProduceResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
@ -1407,6 +1444,7 @@ public class RequestResponseTest {
return new ProduceResponse(responseData, 0);
}
@SuppressWarnings("deprecation")
private ProduceResponse createProduceResponseWithErrorMessage() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,

View File

@ -90,6 +90,8 @@ import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try}
import kafka.coordinator.group.GroupOverview
import scala.annotation.nowarn
/**
* Logic to handle the various Kafka requests
*/
@ -566,7 +568,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val produceRequest = request.body[ProduceRequest]
val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
if (produceRequest.hasTransactionalRecords) {
val (hasIdempotentRecords, hasTransactionalRecords) = {
val flags = RequestUtils.flags(produceRequest)
(flags.getKey, flags.getValue)
}
if (hasTransactionalRecords) {
val isAuthorizedTransactional = produceRequest.transactionalId != null &&
authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
if (!isAuthorizedTransactional) {
@ -575,19 +581,25 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Note that authorization to a transactionalId implies ProducerId authorization
} else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
} else if (hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
val produceRecords = produceRequest.partitionRecordsOrFail.asScala
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRecords)(_._1.topic)
// cache the result to avoid redundant authorization calls
val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC,
produceRequest.dataOrException().topicData().asScala)(_.name())
for ((topicPartition, memoryRecords) <- produceRecords) {
produceRequest.dataOrException.topicData.forEach(topic => topic.partitionData.forEach { partition =>
val topicPartition = new TopicPartition(topic.name, partition.index)
// This caller assumes the type is MemoryRecords and that is true on current serialization
// We cast the type to avoid causing big change to code base.
// https://issues.apache.org/jira/browse/KAFKA-10698
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicPartition.topic))
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
@ -600,9 +612,13 @@ class KafkaApis(val requestChannel: RequestChannel,
case e: ApiException =>
invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))
}
}
})
// the callback for sending a produce response
// The construction of ProduceResponse is able to accept auto-generated protocol data so
// KafkaApis#handleProduceRequest should apply auto-generated protocol to avoid extra conversion.
// https://issues.apache.org/jira/browse/KAFKA-10730
@nowarn("cat=deprecation")
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
var errorInResponse = false

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
@ -45,7 +45,6 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord}
@ -152,6 +151,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
classOf[PrincipalBuilder].getName)
}
@nowarn("cat=deprecation")
val requestKeyToError = Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
@ -290,11 +290,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build()
}
private def createProduceRequest = {
requests.ProduceRequest.Builder.forCurrentMagic(1, 5000,
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava).
build()
}
private def createProduceRequest =
requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setName(tp.topic()).setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
.build()
private def createFetchRequest = {
val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]

View File

@ -20,22 +20,24 @@ package kafka.network
import java.io.IOException
import java.net.{InetAddress, Socket}
import java.util.Properties
import java.util.concurrent._
import java.util.{Collections, Properties}
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.message.ProduceRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.{KafkaException, requests}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.intercept
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
class DynamicConnectionQuotaTest extends BaseRequestTest {
@ -265,12 +267,20 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
}
private def produceRequest: ProduceRequest = {
val topicPartition = new TopicPartition(topic, 0)
val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
val partitionRecords = Map(topicPartition -> memoryRecords)
ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
}
private def produceRequest: ProduceRequest =
requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setName(topic)
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(0)
.setRecords(MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))))))
.iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null))
.build()
def connectionCount: Int = servers.head.socketServer.connectionCount(localAddress)
@ -288,6 +298,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
}
@nowarn("cat=deprecation")
private def verifyConnection(socket: Socket): Unit = {
val produceResponse = sendAndReceive[ProduceResponse](produceRequest, socket)
assertEquals(1, produceResponse.responses.size)

View File

@ -24,7 +24,7 @@ import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, Executors, TimeUnit}
import java.util.{HashMap, Properties, Random}
import java.util.{Properties, Random}
import com.yammer.metrics.core.{Gauge, Meter}
import javax.net.ssl._
@ -33,29 +33,26 @@ import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, ThrottledChannel}
import kafka.utils.Implicits._
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.{SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData}
import org.apache.kafka.common.message.{ProduceRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ClientInformation
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState
import org.apache.kafka.common.network._
import org.apache.kafka.common.network.{ClientInformation, _}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest, VoteRequest}
import org.apache.kafka.common.requests
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils.AppInfoParser
import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, MockTime, Time}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.log4j.Level
import org.junit.Assert._
import org.junit._
import org.scalatest.Assertions.fail
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.control.ControlThrowable
class SocketServerTest {
@ -210,8 +207,12 @@ class SocketServerTest {
val clientId = ""
val ackTimeoutMs = 10000
val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyRequest = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks(ack)
.setTimeoutMs(ackTimeoutMs)
.setTransactionalId(null))
.build()
val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
val byteBuffer = emptyRequest.serialize(emptyHeader)
byteBuffer.rewind()
@ -897,8 +898,12 @@ class SocketServerTest {
val clientId = ""
val ackTimeoutMs = 10000
val ack = 0: Short
val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyRequest = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks(ack)
.setTimeoutMs(ackTimeoutMs)
.setTransactionalId(null))
.build()
val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
val byteBuffer = emptyRequest.serialize(emptyHeader)
@ -980,8 +985,12 @@ class SocketServerTest {
// ...and now send something to trigger the disconnection
val ackTimeoutMs = 10000
val ack = 0: Short
val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyRequest = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection())
.setAcks(ack)
.setTimeoutMs(ackTimeoutMs)
.setTransactionalId(null))
.build()
val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
sendApiRequest(socket, emptyRequest, emptyHeader)
// wait a little bit for the server-side disconnection to occur since it happens asynchronously

View File

@ -20,21 +20,23 @@ package kafka.server
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.Collections
import kafka.integration.KafkaServerTestHarness
import kafka.network.SocketServer
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.ProduceRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.types.Type
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, ResponseHeader}
import org.apache.kafka.common.requests.{ProduceResponse, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{TopicPartition, requests}
import org.junit.Assert._
import org.junit.Test
import scala.jdk.CollectionConverters._
import scala.annotation.nowarn
class EdgeCaseRequestTest extends KafkaServerTestHarness {
@ -108,6 +110,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
}
}
@nowarn("cat=deprecation")
@Test
def testProduceRequestWithNullClientId(): Unit = {
val topic = "topic"
@ -119,8 +122,18 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
val (serializedBytes, responseHeaderVersion) = {
val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null,
correlationId)
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))
val request = ProduceRequest.Builder.forCurrentMagic(1, 10000, Map(topicPartition -> records).asJava).build()
val request = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setName(topicPartition.topic()).setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(topicPartition.partition())
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(10000)
.setTransactionalId(null))
.build()
val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf)
byteBuffer.put(headerBytes)
request.toStruct.writeTo(byteBuffer)

View File

@ -71,6 +71,7 @@ import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, assertTrue}
import org.junit.{After, Test}
import scala.annotation.nowarn
import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -1232,6 +1233,7 @@ class KafkaApisTest {
}
}
@nowarn("cat=deprecation")
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = {
val topic = "topic"
@ -1245,10 +1247,17 @@ class KafkaApisTest {
val tp = new TopicPartition("topic", 0)
val produceRequest = ProduceRequest.Builder.forCurrentMagic(1, 5000,
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("test".getBytes))).asJava)
.build(version.toShort)
val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setName(tp.topic()).setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
.build(version.toShort)
val request = buildRequest(produceRequest)
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),

View File

@ -18,13 +18,14 @@
package kafka.server
import java.nio.ByteBuffer
import java.util.Properties
import java.util.{Collections, Properties}
import kafka.log.LogConfig
import kafka.message.ZStdCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.ProduceRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
@ -32,6 +33,7 @@ import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.fail
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@ -42,15 +44,24 @@ class ProduceRequestTest extends BaseRequestTest {
val metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@nowarn("cat=deprecation")
@Test
def testSimpleProduceRequest(): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = {
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader,
ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName(topicPartition.topic())
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(topicPartition.partition())
.setRecords(memoryRecords)))).iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null)).build())
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
@ -69,6 +80,7 @@ class ProduceRequestTest extends BaseRequestTest {
new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}
@nowarn("cat=deprecation")
@Test
def testProduceWithInvalidTimestamp(): Unit = {
val topic = "topic"
@ -89,8 +101,16 @@ class ProduceRequestTest extends BaseRequestTest {
val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP)
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> records)
val produceResponse = sendProduceRequest(leader, ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName(topicPartition.topic())
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(topicPartition.partition())
.setRecords(records)))).iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null)).build())
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error)
@ -105,6 +125,7 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals("One or more records have been rejected due to invalid timestamp", partitionResponse.errorMessage)
}
@nowarn("cat=deprecation")
@Test
def testProduceToNonReplica(): Unit = {
val topic = "topic"
@ -120,8 +141,16 @@ class ProduceRequestTest extends BaseRequestTest {
// Send the produce request to the non-replica
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("key".getBytes, "value".getBytes))
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> records)
val produceRequest = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName(topicPartition.topic())
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(topicPartition.partition())
.setRecords(records)))).iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null)).build()
val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
assertEquals(1, produceResponse.responses.size)
@ -136,6 +165,7 @@ class ProduceRequestTest extends BaseRequestTest {
}.getOrElse(fail(s"No leader elected for topic $topic"))
}
@nowarn("cat=deprecation")
@Test
def testCorruptLz4ProduceRequest(): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
@ -146,9 +176,16 @@ class ProduceRequestTest extends BaseRequestTest {
val lz4ChecksumOffset = 6
memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + lz4ChecksumOffset, 0)
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader,
ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName(topicPartition.topic())
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
.setIndex(topicPartition.partition())
.setRecords(memoryRecords)))).iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null)).build())
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
@ -159,6 +196,7 @@ class ProduceRequestTest extends BaseRequestTest {
assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0)
}
@nowarn("cat=deprecation")
@Test
def testZSTDProduceRequest(): Unit = {
val topic = "topic"
@ -172,11 +210,21 @@ class ProduceRequestTest extends BaseRequestTest {
val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD,
new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val partitionRecords = new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("topic").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(partition)
.setRecords(memoryRecords))))
.iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null)
// produce request with v7: works fine!
val res1 = sendProduceRequest(leader,
new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build())
new ProduceRequest.Builder(7, 7, partitionRecords).build())
val (tp1, partitionResponse1) = res1.responses.asScala.head
assertEquals(topicPartition, tp1)
assertEquals(Errors.NONE, partitionResponse1.error)
@ -185,7 +233,7 @@ class ProduceRequestTest extends BaseRequestTest {
// produce request with v3: returns Errors.UNSUPPORTED_COMPRESSION_TYPE.
val res2 = sendProduceRequest(leader,
new ProduceRequest.Builder(3, 3, -1, 3000, partitionRecords.asJava, null)
new ProduceRequest.Builder(3, 3, partitionRecords)
.buildUnsafe(3))
val (tp2, partitionResponse2) = res2.responses.asScala.head
assertEquals(topicPartition, tp2)

View File

@ -26,7 +26,6 @@ import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
@ -35,7 +34,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message._
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, _}
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
@ -43,15 +42,15 @@ import org.apache.kafka.common.quota.ClientQuotaFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common._
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
class RequestQuotaTest extends BaseRequestTest {
@ -211,8 +210,16 @@ class RequestQuotaTest extends BaseRequestTest {
private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = {
apiKey match {
case ApiKeys.PRODUCE =>
ProduceRequest.Builder.forCurrentMagic(1, 5000,
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setName(tp.topic()).setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
case ApiKeys.FETCH =>
val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.producer;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class ProducerRequestBenchmark {
private static final int NUMBER_OF_PARTITIONS = 3;
private static final int NUMBER_OF_RECORDS = 3;
private static final List<ProduceRequestData.TopicProduceData> TOPIC_PRODUCE_DATA = Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setName("tp")
.setPartitionData(IntStream.range(0, NUMBER_OF_PARTITIONS).mapToObj(partitionIndex -> new ProduceRequestData.PartitionProduceData()
.setIndex(partitionIndex)
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, IntStream.range(0, NUMBER_OF_RECORDS)
.mapToObj(recordIndex -> new SimpleRecord(100, "hello0".getBytes(StandardCharsets.UTF_8)))
.collect(Collectors.toList())
.toArray(new SimpleRecord[0]))))
.collect(Collectors.toList()))
);
private static final ProduceRequestData PRODUCE_REQUEST_DATA = new ProduceRequestData()
.setTimeoutMs(100)
.setAcks((short) 1)
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(TOPIC_PRODUCE_DATA.iterator()));
private static ProduceRequest request() {
return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, PRODUCE_REQUEST_DATA).build();
}
private static final ProduceRequest REQUEST = request();
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public ProduceRequest constructorProduceRequest() {
return request();
}
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public ProduceResponse constructorErrorResponse() {
return REQUEST.getErrorResponse(0, Errors.INVALID_REQUEST.exception());
}
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public Struct constructorStruct() {
return REQUEST.toStruct();
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ProduceResponse;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.util.AbstractMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class ProducerResponseBenchmark {
private static final int NUMBER_OF_PARTITIONS = 3;
private static final int NUMBER_OF_RECORDS = 3;
private static final Map<TopicPartition, ProduceResponse.PartitionResponse> PARTITION_RESPONSE_MAP = IntStream.range(0, NUMBER_OF_PARTITIONS)
.mapToObj(partitionIndex -> new AbstractMap.SimpleEntry<>(
new TopicPartition("tp", partitionIndex),
new ProduceResponse.PartitionResponse(
Errors.NONE,
0,
0,
0,
IntStream.range(0, NUMBER_OF_RECORDS)
.mapToObj(ProduceResponse.RecordError::new)
.collect(Collectors.toList()))
))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
/**
* this method is still used by production so we benchmark it.
* see https://issues.apache.org/jira/browse/KAFKA-10730
*/
@SuppressWarnings("deprecation")
private static ProduceResponse response() {
return new ProduceResponse(PARTITION_RESPONSE_MAP);
}
private static final ProduceResponse RESPONSE = response();
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public AbstractResponse constructorProduceResponse() {
return response();
}
@Benchmark
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public Struct constructorStruct() {
return RESPONSE.toStruct(PRODUCE.latestVersion());
}
}