KAFKA-16448: Unify class cast exception handling for both key and value (#16736)

Part of KIP-1033. Minor code cleanup.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Loïc GREFFIER 2024-07-31 22:24:15 +02:00 committed by GitHub
parent 8f2679bebf
commit 0eb9ac2bd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 30 deletions

View File

@ -53,9 +53,11 @@ import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics;
import org.slf4j.Logger;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -200,7 +202,8 @@ public class RecordCollectorImpl implements RecordCollector {
try {
keyBytes = keySerializer.serialize(topic, headers, key);
} catch (final ClassCastException exception) {
throw createStreamsExceptionForKeyClassCastException(
throw createStreamsExceptionForClassCastException(
ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic,
key,
keySerializer,
@ -223,7 +226,8 @@ public class RecordCollectorImpl implements RecordCollector {
try {
valBytes = valueSerializer.serialize(topic, headers, value);
} catch (final ClassCastException exception) {
throw createStreamsExceptionForValueClassCastException(
throw createStreamsExceptionForClassCastException(
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic,
value,
valueSerializer,
@ -335,39 +339,27 @@ public class RecordCollectorImpl implements RecordCollector {
droppedRecordsSensor.record();
}
private <K> StreamsException createStreamsExceptionForKeyClassCastException(final String topic,
final K key,
final Serializer<K> keySerializer,
final ClassCastException exception) {
final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
return new StreamsException(
String.format(
"ClassCastException while producing data to topic %s. " +
"The key serializer %s is not compatible to the actual key type: %s. " +
"Change the default key serde in StreamConfig or provide the correct key serde via method parameters " +
"(for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with " +
"`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
topic,
keySerializer.getClass().getName(),
keyClass),
exception);
}
private <V> StreamsException createStreamsExceptionForValueClassCastException(final String topic,
final V value,
final Serializer<V> valueSerializer,
final ClassCastException exception) {
final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
private <KV> StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final KV keyOrValue,
final Serializer<KV> keyOrValueSerializer,
final ClassCastException exception) {
final String keyOrValueClass = keyOrValue == null
? String.format("unknown because %s is null", origin.toString().toLowerCase(Locale.ROOT)) : keyOrValue.getClass().getName();
return new StreamsException(
MessageFormat.format(
String.format(
"ClassCastException while producing data to topic %s. " +
"The value serializer %s is not compatible to the actual value type: %s. " +
"Change the default value serde in StreamConfig or provide the correct value serde via method parameters " +
"(for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with " +
"`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
"The {0} serializer %s is not compatible to the actual {0} type: %s. " +
"Change the default {0} serde in StreamConfig or provide the correct {0} serde via method parameters " +
"(for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with " +
"`Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
topic,
valueSerializer.getClass().getName(),
valueClass),
keyOrValueSerializer.getClass().getName(),
keyOrValueClass),
origin.toString().toLowerCase(Locale.ROOT)),
exception);
}