mirror of https://github.com/apache/kafka.git
KAFKA-8729, pt 2: Add error_records and error_message to PartitionResponse (#7150)
As noted in the KIP-467, the updated ProduceResponse is
```
Produce Response (Version: 8) => [responses] throttle_time_ms
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code base_offset log_append_time log_start_offset
partition => INT32
error_code => INT16
base_offset => INT64
log_append_time => INT64
log_start_offset => INT64
error_records => [INT32] // new field, encodes the relative offset of the records that caused error
error_message => STRING // new field, encodes the error message that client can use to log itself
throttle_time_ms => INT32
with a new error code:
```
INVALID_RECORD(86, "Some record has failed the validation on broker and hence be rejected.", InvalidRecordException::new);
Reviewers: Jason Gustafson <jason@confluent.io>, Magnus Edenhill <magnus@edenhill.se>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
9fbb0de5fc
commit
f6f24c4700
|
|
@ -35,6 +35,7 @@ import org.apache.kafka.common.MetricName;
|
|||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.errors.InvalidTopicException;
|
||||
import org.apache.kafka.common.errors.RecordTooLargeException;
|
||||
import org.apache.kafka.common.errors.RetriableException;
|
||||
|
|
@ -56,7 +57,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
|
|||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.record.BufferSupplier;
|
||||
import org.apache.kafka.common.record.ControlRecordType;
|
||||
import org.apache.kafka.common.record.InvalidRecordException;
|
||||
import org.apache.kafka.common.record.Record;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.common.record.Records;
|
||||
|
|
@ -1422,7 +1422,7 @@ public class Fetcher<K, V> implements Closeable {
|
|||
if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
|
||||
try {
|
||||
batch.ensureValid();
|
||||
} catch (InvalidRecordException e) {
|
||||
} catch (CorruptRecordException e) {
|
||||
throw new KafkaException("Record batch for partition " + partition + " at offset " +
|
||||
batch.baseOffset() + " is invalid, cause: " + e.getMessage());
|
||||
}
|
||||
|
|
@ -1433,7 +1433,7 @@ public class Fetcher<K, V> implements Closeable {
|
|||
if (checkCrcs) {
|
||||
try {
|
||||
record.ensureValid();
|
||||
} catch (InvalidRecordException e) {
|
||||
} catch (CorruptRecordException e) {
|
||||
throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
|
||||
+ " is invalid, cause: " + e.getMessage());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,11 +14,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
package org.apache.kafka.common;
|
||||
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.errors.ApiException;
|
||||
|
||||
public class InvalidRecordException extends CorruptRecordException {
|
||||
public class InvalidRecordException extends ApiException {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.protocol;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.errors.ApiException;
|
||||
import org.apache.kafka.common.errors.BrokerNotAvailableException;
|
||||
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
||||
|
|
@ -315,7 +316,8 @@ public enum Errors {
|
|||
NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.",
|
||||
NoReassignmentInProgressException::new),
|
||||
GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
|
||||
GroupSubscribedToTopicException::new);
|
||||
GroupSubscribedToTopicException::new),
|
||||
INVALID_RECORD(87, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new);
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(Errors.class);
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.protocol.types.Field;
|
||||
import org.apache.kafka.common.protocol.types.Schema;
|
||||
import org.apache.kafka.common.protocol.types.Struct;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.utils.ByteUtils;
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.utils.ByteBufferOutputStream;
|
||||
import org.apache.kafka.common.utils.ByteUtils;
|
||||
|
|
@ -144,11 +146,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
|
|||
@Override
|
||||
public void ensureValid() {
|
||||
if (sizeInBytes() < RECORD_BATCH_OVERHEAD)
|
||||
throw new InvalidRecordException("Record batch is corrupt (the size " + sizeInBytes() +
|
||||
throw new CorruptRecordException("Record batch is corrupt (the size " + sizeInBytes() +
|
||||
" is smaller than the minimum allowed overhead " + RECORD_BATCH_OVERHEAD + ")");
|
||||
|
||||
if (!isValid())
|
||||
throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
|
||||
throw new CorruptRecordException("Record is corrupt (stored crc = " + checksum()
|
||||
+ ", computed crc = " + computeChecksum() + ")");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.protocol.types.Field;
|
||||
import org.apache.kafka.common.protocol.types.Schema;
|
||||
import org.apache.kafka.common.protocol.types.Struct;
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.utils.ByteBufferOutputStream;
|
||||
import org.apache.kafka.common.utils.ByteUtils;
|
||||
import org.apache.kafka.common.utils.Checksums;
|
||||
|
|
@ -135,11 +136,11 @@ public final class LegacyRecord {
|
|||
*/
|
||||
public void ensureValid() {
|
||||
if (sizeInBytes() < RECORD_OVERHEAD_V0)
|
||||
throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too "
|
||||
throw new CorruptRecordException("Record is corrupt (crc could not be retrieved as the record is too "
|
||||
+ "small, size = " + sizeInBytes() + ")");
|
||||
|
||||
if (!isValid())
|
||||
throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
|
||||
throw new CorruptRecordException("Record is corrupt (stored crc = " + checksum()
|
||||
+ ", computed crc = " + computeChecksum() + ")");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
|
|
@ -26,7 +27,6 @@ import org.apache.kafka.common.protocol.types.Field;
|
|||
import org.apache.kafka.common.protocol.types.Schema;
|
||||
import org.apache.kafka.common.protocol.types.Struct;
|
||||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.common.record.InvalidRecordException;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.MutableRecordBatch;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
|
|
@ -120,9 +120,15 @@ public class ProduceRequest extends AbstractRequest {
|
|||
*/
|
||||
private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;
|
||||
|
||||
/**
|
||||
* V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
|
||||
* (See KIP-467)
|
||||
*/
|
||||
private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7;
|
||||
|
||||
public static Schema[] schemaVersions() {
|
||||
return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
|
||||
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7};
|
||||
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7, PRODUCE_REQUEST_V8};
|
||||
}
|
||||
|
||||
public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
|
||||
|
|
@ -337,6 +343,7 @@ public class ProduceRequest extends AbstractRequest {
|
|||
case 5:
|
||||
case 6:
|
||||
case 7:
|
||||
case 8:
|
||||
return new ProduceResponse(responseMap, throttleTimeMs);
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
|
||||
|
|
@ -434,6 +441,7 @@ public class ProduceRequest extends AbstractRequest {
|
|||
case 5:
|
||||
case 6:
|
||||
case 7:
|
||||
case 8:
|
||||
return RecordBatch.MAGIC_VALUE_V2;
|
||||
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -53,28 +54,37 @@ public class ProduceResponse extends AbstractResponse {
|
|||
/**
|
||||
* Possible error code:
|
||||
*
|
||||
* CORRUPT_MESSAGE (2)
|
||||
* UNKNOWN_TOPIC_OR_PARTITION (3)
|
||||
* NOT_LEADER_FOR_PARTITION (6)
|
||||
* MESSAGE_TOO_LARGE (10)
|
||||
* INVALID_TOPIC (17)
|
||||
* RECORD_LIST_TOO_LARGE (18)
|
||||
* NOT_ENOUGH_REPLICAS (19)
|
||||
* NOT_ENOUGH_REPLICAS_AFTER_APPEND (20)
|
||||
* INVALID_REQUIRED_ACKS (21)
|
||||
* TOPIC_AUTHORIZATION_FAILED (29)
|
||||
* UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
|
||||
* INVALID_PRODUCER_EPOCH (47)
|
||||
* CLUSTER_AUTHORIZATION_FAILED (31)
|
||||
* TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
|
||||
* {@link Errors#CORRUPT_MESSAGE}
|
||||
* {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
|
||||
* {@link Errors#NOT_LEADER_FOR_PARTITION}
|
||||
* {@link Errors#MESSAGE_TOO_LARGE}
|
||||
* {@link Errors#INVALID_TOPIC_EXCEPTION}
|
||||
* {@link Errors#RECORD_LIST_TOO_LARGE}
|
||||
* {@link Errors#NOT_ENOUGH_REPLICAS}
|
||||
* {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND}
|
||||
* {@link Errors#INVALID_REQUIRED_ACKS}
|
||||
* {@link Errors#TOPIC_AUTHORIZATION_FAILED}
|
||||
* {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT}
|
||||
* {@link Errors#INVALID_PRODUCER_EPOCH}
|
||||
* {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
|
||||
* {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
|
||||
* {@link Errors#INVALID_RECORD}
|
||||
*/
|
||||
|
||||
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
|
||||
private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
|
||||
private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
|
||||
private static final String ERROR_RECORDS_KEY_NAME = "error_records";
|
||||
private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset";
|
||||
private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message";
|
||||
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
|
||||
|
||||
private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME,
|
||||
"The start offset of the log at the time this produce response was created", INVALID_OFFSET);
|
||||
private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME,
|
||||
"The error message of the record that caused the batch to be dropped");
|
||||
private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME,
|
||||
"The global error message summarizing the common root cause of the records that caused the batch to be dropped");
|
||||
|
||||
private static final Schema PRODUCE_RESPONSE_V0 = new Schema(
|
||||
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
|
||||
|
|
@ -149,9 +159,32 @@ public class ProduceResponse extends AbstractResponse {
|
|||
*/
|
||||
private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;
|
||||
|
||||
/**
|
||||
* V8 adds error_records and error_message. (see KIP-467)
|
||||
*/
|
||||
public static final Schema PRODUCE_RESPONSE_V8 = new Schema(
|
||||
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
|
||||
TOPIC_NAME,
|
||||
new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
|
||||
PARTITION_ID,
|
||||
ERROR_CODE,
|
||||
new Field(BASE_OFFSET_KEY_NAME, INT64),
|
||||
new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " +
|
||||
"the messages. If CreateTime is used for the topic, the timestamp will be -1. " +
|
||||
"If LogAppendTime is used for the topic, the timestamp will be the broker local " +
|
||||
"time when the messages are appended."),
|
||||
LOG_START_OFFSET_FIELD,
|
||||
new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema(
|
||||
new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " +
|
||||
"that caused the batch to be dropped"),
|
||||
RELATIVE_OFFSET_ERROR_MESSAGE_FIELD
|
||||
)), "The relative offsets of records that caused the batch to be dropped"),
|
||||
ERROR_MESSAGE_FIELD)))))),
|
||||
THROTTLE_TIME_MS);
|
||||
|
||||
public static Schema[] schemaVersions() {
|
||||
return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
|
||||
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7};
|
||||
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7, PRODUCE_RESPONSE_V8};
|
||||
}
|
||||
|
||||
private final Map<TopicPartition, PartitionResponse> responses;
|
||||
|
|
@ -183,6 +216,7 @@ public class ProduceResponse extends AbstractResponse {
|
|||
for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
|
||||
Struct topicRespStruct = (Struct) topicResponse;
|
||||
String topic = topicRespStruct.get(TOPIC_NAME);
|
||||
|
||||
for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
|
||||
Struct partRespStruct = (Struct) partResponse;
|
||||
int partition = partRespStruct.get(PARTITION_ID);
|
||||
|
|
@ -190,8 +224,20 @@ public class ProduceResponse extends AbstractResponse {
|
|||
long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
|
||||
long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
|
||||
long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
|
||||
|
||||
Map<Integer, String> errorRecords = new HashMap<>();
|
||||
if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) {
|
||||
for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) {
|
||||
Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage;
|
||||
Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME);
|
||||
String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, "");
|
||||
errorRecords.put(relativeOffset, relativeOffsetErrorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, "");
|
||||
TopicPartition tp = new TopicPartition(topic, partition);
|
||||
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset));
|
||||
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage));
|
||||
}
|
||||
}
|
||||
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
|
||||
|
|
@ -223,6 +269,18 @@ public class ProduceResponse extends AbstractResponse {
|
|||
if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
|
||||
partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
|
||||
partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);
|
||||
|
||||
List<Struct> errorRecords = new ArrayList<>();
|
||||
for (Map.Entry<Integer, String> recordOffsetAndMessage : part.errorRecords.entrySet()) {
|
||||
Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME)
|
||||
.set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey())
|
||||
.setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue());
|
||||
errorRecords.add(recordOffsetAndMessageStruct);
|
||||
}
|
||||
|
||||
partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray());
|
||||
|
||||
partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
|
||||
partitionArray.add(partStruct);
|
||||
}
|
||||
topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
|
||||
|
|
@ -256,16 +314,28 @@ public class ProduceResponse extends AbstractResponse {
|
|||
public long baseOffset;
|
||||
public long logAppendTime;
|
||||
public long logStartOffset;
|
||||
public Map<Integer, String> errorRecords;
|
||||
public String errorMessage;
|
||||
|
||||
public PartitionResponse(Errors error) {
|
||||
this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET);
|
||||
}
|
||||
|
||||
public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) {
|
||||
this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null);
|
||||
}
|
||||
|
||||
public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords) {
|
||||
this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, "");
|
||||
}
|
||||
|
||||
public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords, String errorMessage) {
|
||||
this.error = error;
|
||||
this.baseOffset = baseOffset;
|
||||
this.logAppendTime = logAppendTime;
|
||||
this.logStartOffset = logStartOffset;
|
||||
this.errorRecords = errorRecords;
|
||||
this.errorMessage = errorMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -280,6 +350,10 @@ public class ProduceResponse extends AbstractResponse {
|
|||
b.append(logAppendTime);
|
||||
b.append(", logStartOffset: ");
|
||||
b.append(logStartOffset);
|
||||
b.append(", errorRecords: ");
|
||||
b.append(errorRecords);
|
||||
b.append(", errorMessage: ");
|
||||
b.append(errorMessage);
|
||||
b.append('}');
|
||||
return b.toString();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,9 @@
|
|||
// Version 5 and 6 are the same as version 3.
|
||||
//
|
||||
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
|
||||
"validVersions": "0-7",
|
||||
//
|
||||
// Starting in Version 8, response has ErrorRecords and ErrorMEssage. See KIP-467.
|
||||
"validVersions": "0-8",
|
||||
"fields": [
|
||||
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId",
|
||||
"about": "The transactional ID, or null if the producer is not transactional." },
|
||||
|
|
|
|||
|
|
@ -27,7 +27,10 @@
|
|||
//
|
||||
// Version 5 added LogStartOffset to filter out spurious
|
||||
// OutOfOrderSequenceExceptions on the client.
|
||||
"validVersions": "0-7",
|
||||
//
|
||||
// Version 8 added ErrorRecords and ErrorMessage to include information about
|
||||
// records that cause the whole batch to be dropped
|
||||
"validVersions": "0-8",
|
||||
"fields": [
|
||||
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
|
||||
"about": "Each produce response", "fields": [
|
||||
|
|
@ -44,7 +47,16 @@
|
|||
{ "name": "LogAppendTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true,
|
||||
"about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." },
|
||||
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
|
||||
"about": "The log start offset." }
|
||||
"about": "The log start offset." },
|
||||
{ "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "ignorable": true,
|
||||
"about": "The relative offsets of records that caused the batch to be dropped", "fields": [
|
||||
{ "name": "RelativeOffset", "type": "int32", "versions": "8+",
|
||||
"about": "The relative offset of the record that cause the batch to be dropped" },
|
||||
{ "name": "RelativeOffsetErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+",
|
||||
"about": "The error message of the record that caused the batch to be dropped"}
|
||||
]},
|
||||
{ "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
|
||||
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}
|
||||
]}
|
||||
]},
|
||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
|
||||
|
|
|
|||
|
|
@ -505,6 +505,78 @@ public final class MessageTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProduceResponseVersions() throws Exception {
|
||||
String topicName = "topic";
|
||||
int partitionIndex = 0;
|
||||
short errorCode = Errors.INVALID_TOPIC_EXCEPTION.code();
|
||||
long baseOffset = 12L;
|
||||
int throttleTimeMs = 1234;
|
||||
long logAppendTimeMs = 1234L;
|
||||
long logStartOffset = 1234L;
|
||||
int relativeOffset = 0;
|
||||
String relativeOffsetErrorMessage = "error message";
|
||||
String errorMessage = "global error message";
|
||||
|
||||
testAllMessageRoundTrips(new ProduceResponseData()
|
||||
.setResponses(singletonList(
|
||||
new ProduceResponseData.TopicProduceResponse()
|
||||
.setName(topicName)
|
||||
.setPartitions(singletonList(
|
||||
new ProduceResponseData.PartitionProduceResponse()
|
||||
.setPartitionIndex(partitionIndex)
|
||||
.setErrorCode(errorCode)
|
||||
.setBaseOffset(baseOffset))))));
|
||||
|
||||
Supplier<ProduceResponseData> response =
|
||||
() -> new ProduceResponseData()
|
||||
.setResponses(singletonList(
|
||||
new ProduceResponseData.TopicProduceResponse()
|
||||
.setName(topicName)
|
||||
.setPartitions(singletonList(
|
||||
new ProduceResponseData.PartitionProduceResponse()
|
||||
.setPartitionIndex(partitionIndex)
|
||||
.setErrorCode(errorCode)
|
||||
.setBaseOffset(baseOffset)
|
||||
.setLogAppendTimeMs(logAppendTimeMs)
|
||||
.setLogStartOffset(logStartOffset)
|
||||
.setErrorRecords(singletonList(
|
||||
new ProduceResponseData.RelativeOffsetAndErrorMessage()
|
||||
.setRelativeOffset(relativeOffset)
|
||||
.setRelativeOffsetErrorMessage(relativeOffsetErrorMessage)))
|
||||
.setErrorMessage(errorMessage)))))
|
||||
.setThrottleTimeMs(throttleTimeMs);
|
||||
|
||||
for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) {
|
||||
ProduceResponseData responseData = response.get();
|
||||
|
||||
if (version < 8) {
|
||||
responseData.responses().get(0).partitions().get(0).setErrorRecords(Collections.emptyList());
|
||||
responseData.responses().get(0).partitions().get(0).setErrorMessage(null);
|
||||
}
|
||||
|
||||
if (version < 5) {
|
||||
responseData.responses().get(0).partitions().get(0).setLogStartOffset(-1);
|
||||
}
|
||||
|
||||
if (version < 2) {
|
||||
responseData.responses().get(0).partitions().get(0).setLogAppendTimeMs(-1);
|
||||
}
|
||||
|
||||
if (version < 1) {
|
||||
responseData.setThrottleTimeMs(0);
|
||||
}
|
||||
|
||||
if (version >= 3 && version <= 4) {
|
||||
testAllMessageRoundTripsBetweenVersions(version, (short) 4, responseData, responseData);
|
||||
} else if (version >= 6 && version <= 7) {
|
||||
testAllMessageRoundTripsBetweenVersions(version, (short) 7, responseData, responseData);
|
||||
} else {
|
||||
testEquivalentMessageRoundTrip(version, responseData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testAllMessageRoundTrips(Message message) throws Exception {
|
||||
testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.junit.Test;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.utils.CloseableIterator;
|
||||
|
|
@ -173,7 +175,7 @@ public class DefaultRecordBatchTest {
|
|||
assertEquals(actualSize, DefaultRecordBatch.sizeInBytes(Arrays.asList(records)));
|
||||
}
|
||||
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
@Test(expected = CorruptRecordException.class)
|
||||
public void testInvalidRecordSize() {
|
||||
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
|
||||
CompressionType.NONE, TimestampType.CREATE_TIME,
|
||||
|
|
@ -233,7 +235,7 @@ public class DefaultRecordBatchTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
@Test(expected = CorruptRecordException.class)
|
||||
public void testInvalidCrc() {
|
||||
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
|
||||
CompressionType.NONE, TimestampType.CREATE_TIME,
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.utils.ByteBufferInputStream;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
|
@ -93,7 +94,7 @@ public class LegacyRecordTest {
|
|||
try {
|
||||
copy.ensureValid();
|
||||
fail("Should fail the above test.");
|
||||
} catch (InvalidRecordException e) {
|
||||
} catch (CorruptRecordException e) {
|
||||
// this is good
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.kafka.common.record;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.errors.CorruptRecordException;
|
||||
import org.apache.kafka.common.utils.ByteBufferOutputStream;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.junit.Test;
|
||||
|
|
@ -66,7 +68,7 @@ public class SimpleLegacyRecordTest {
|
|||
}
|
||||
|
||||
/* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
@Test(expected = CorruptRecordException.class)
|
||||
public void testIsValidWithTooSmallBuffer() {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(2);
|
||||
LegacyRecord record = new LegacyRecord(buffer);
|
||||
|
|
@ -74,7 +76,7 @@ public class SimpleLegacyRecordTest {
|
|||
record.ensureValid();
|
||||
}
|
||||
|
||||
@Test(expected = InvalidRecordException.class)
|
||||
@Test(expected = CorruptRecordException.class)
|
||||
public void testIsValidWithChecksumMismatch() {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(4);
|
||||
// set checksum
|
||||
|
|
|
|||
|
|
@ -17,10 +17,10 @@
|
|||
|
||||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.common.record.InvalidRecordException;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecordsBuilder;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
|
|
|
|||
|
|
@ -233,6 +233,7 @@ public class RequestResponseTest {
|
|||
checkRequest(createProduceRequest(3), true);
|
||||
checkErrorResponse(createProduceRequest(3), new UnknownServerException(), true);
|
||||
checkResponse(createProduceResponse(), 2, true);
|
||||
checkResponse(createProduceResponseWithErrorMessage(), 8, true);
|
||||
checkRequest(createStopReplicaRequest(0, true), true);
|
||||
checkRequest(createStopReplicaRequest(0, false), true);
|
||||
checkErrorResponse(createStopReplicaRequest(0, true), new UnknownServerException(), true);
|
||||
|
|
@ -1118,6 +1119,13 @@ public class RequestResponseTest {
|
|||
return new ProduceResponse(responseData, 0);
|
||||
}
|
||||
|
||||
private ProduceResponse createProduceResponseWithErrorMessage() {
|
||||
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
|
||||
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
|
||||
10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonMap(0, "error message"), "global error message"));
|
||||
return new ProduceResponse(responseData, 0);
|
||||
}
|
||||
|
||||
private StopReplicaRequest createStopReplicaRequest(int version, boolean deletePartitions) {
|
||||
Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition("test", 0));
|
||||
return new StopReplicaRequest.Builder((short) version, 0, 1, 0, deletePartitions, partitions).build();
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ package org.apache.kafka.connect.util;
|
|||
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.record.InvalidRecordException;
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.runtime.WorkerConfig;
|
||||
|
|
|
|||
|
|
@ -22,11 +22,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
|
|||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.InvalidRecordException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||
import org.apache.kafka.common.record.InvalidRecordException;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ import org.apache.kafka.common.record._
|
|||
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
|
||||
import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
|
||||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
|
||||
|
|
@ -1356,7 +1356,7 @@ class Log(@volatile var dir: File,
|
|||
// check the validity of the message by checking CRC
|
||||
if (!batch.isValid) {
|
||||
brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
|
||||
throw new InvalidRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
|
||||
throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
|
||||
}
|
||||
|
||||
if (batch.maxTimestamp > maxTimestamp) {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
|
|||
import kafka.server.epoch.LeaderEpochFileCache
|
||||
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.InvalidRecordException
|
||||
import org.apache.kafka.common.errors.CorruptRecordException
|
||||
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
|
||||
import org.apache.kafka.common.record._
|
||||
|
|
@ -367,9 +368,9 @@ class LogSegment private[log] (val log: FileRecords,
|
|||
}
|
||||
}
|
||||
} catch {
|
||||
case e: CorruptRecordException =>
|
||||
warn("Found invalid messages in log segment %s at byte offset %d: %s."
|
||||
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
|
||||
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
|
||||
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
|
||||
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
|
||||
}
|
||||
val truncated = log.sizeInBytes - validBytes
|
||||
if (truncated > 0)
|
||||
|
|
|
|||
|
|
@ -23,9 +23,10 @@ import kafka.common.LongRef
|
|||
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
|
||||
import kafka.server.BrokerTopicStats
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
|
||||
import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
|
||||
import org.apache.kafka.common.InvalidRecordException
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
|
||||
import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.{Seq, mutable}
|
||||
|
|
@ -162,7 +163,7 @@ private[kafka] object LogValidator extends Logging {
|
|||
} catch {
|
||||
case e: InvalidRecordException =>
|
||||
brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
|
||||
throw new InvalidRecordException(e.getMessage + s" in topic partition $topicPartition.")
|
||||
throw new CorruptRecordException(e.getMessage + s" in topic partition $topicPartition.")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ import java.util.function.Consumer
|
|||
|
||||
import com.yammer.metrics.core.Gauge
|
||||
import kafka.log.LogAppendInfo
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
|
||||
import org.apache.kafka.common.internals.PartitionStates
|
||||
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
|
||||
import org.apache.kafka.common.requests._
|
||||
|
|
@ -331,7 +331,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
}
|
||||
}
|
||||
} catch {
|
||||
case ime: CorruptRecordException =>
|
||||
case ime@( _: CorruptRecordException | _: InvalidRecordException) =>
|
||||
// we log the error and continue. This ensures two things
|
||||
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
|
||||
// down and cause other topic partition to also lag
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile
|
|||
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
|
||||
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
|
||||
import org.apache.kafka.common.errors._
|
||||
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
|
||||
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import kafka.common.LongRef
|
|||
import kafka.message._
|
||||
import kafka.server.BrokerTopicStats
|
||||
import kafka.utils.TestUtils.meterCount
|
||||
import org.apache.kafka.common.InvalidRecordException
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
|
||||
import org.apache.kafka.common.record._
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ class ProduceRequestTest extends BaseRequestTest {
|
|||
assertEquals(Errors.NONE, partitionResponse.error)
|
||||
assertEquals(expectedOffset, partitionResponse.baseOffset)
|
||||
assertEquals(-1, partitionResponse.logAppendTime)
|
||||
assertTrue(partitionResponse.errorRecords.isEmpty)
|
||||
partitionResponse
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue