MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Rajini Sivaram 2020-10-20 10:15:01 +01:00 committed by GitHub
parent e3d6344ed7
commit f8e3b84ec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 15 deletions

View File

@ -616,7 +616,7 @@ public class Sender implements Runnable {
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
exception = error.exception(response.errorMessage);
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
@ -629,7 +629,7 @@ public class Sender implements Runnable {
batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
"to request metadata update now", batch.topicPartition, error.exception().toString());
"to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString());
}
metadata.requestUpdate();
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
@ -93,6 +94,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -1726,7 +1728,7 @@ public class SenderTest {
assertFalse(batchIterator.hasNext());
assertEquals(expectedSequence, firstBatch.baseSequence());
return true;
}, produceResponse(tp, responseOffset, responseError, 0, logStartOffset));
}, produceResponse(tp, responseOffset, responseError, 0, logStartOffset, null));
}
@Test
@ -2129,7 +2131,7 @@ public class SenderTest {
time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L, null));
sender.runOnce(); // receive response (offset=0)
assertEquals(0, client.inFlightRequestCount());
assertEquals(0, sender.inFlightBatches(tp0).size());
@ -2432,6 +2434,29 @@ public class SenderTest {
sender.runOnce();
}
@Test
public void testDefaultErrorMessage() throws Exception {
verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0), Errors.INVALID_REQUEST.message());
}
@Test
public void testCustomErrorMessage() throws Exception {
String errorMessage = "testCustomErrorMessage";
verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage);
}
private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
client.respond(response);
sender.runOnce();
sender.runOnce();
ExecutionException e1 = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
assertEquals(InvalidRequestException.class, e1.getCause().getClass());
assertEquals(expectedMessage, e1.getCause().getMessage());
}
class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
private TransactionResult requiredResult;
@ -2527,8 +2552,9 @@ public class SenderTest {
null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset,
RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
return new ProduceResponse(partResp, throttleTimeMs);
}
@ -2544,7 +2570,7 @@ public class SenderTest {
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
return produceResponse(tp, offset, error, throttleTimeMs, -1L);
return produceResponse(tp, offset, error, throttleTimeMs, -1L, null);
}
private TransactionManager createTransactionManager() {

View File

@ -98,10 +98,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val producer = createProducer(brokerList = brokerList)
try {
producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
fail("Should throw CorruptedRecordException")
} catch {
case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
val e = intercept[ExecutionException] {
producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
}.getCause
assertTrue(e.isInstanceOf[InvalidTimestampException])
assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage)
} finally {
producer.close()
}
@ -109,10 +110,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
// Test compressed messages.
val compressedProducer = createProducer(brokerList = brokerList, compressionType = "gzip")
try {
compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
fail("Should throw CorruptedRecordException")
} catch {
case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
val e = intercept[ExecutionException] {
compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
}.getCause
assertTrue(e.isInstanceOf[InvalidTimestampException])
assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage)
} finally {
compressedProducer.close()
}