From 0eb9ac2bd080800f300a1bd28c75bf84e793757c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Wed, 31 Jul 2024 22:24:15 +0200 Subject: [PATCH] KAFKA-16448: Unify class cast exception handling for both key and value (#16736) Part of KIP-1033. Minor code cleanup. Reviewers: Matthias J. Sax --- .../internals/RecordCollectorImpl.java | 52 ++++++++----------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index de4afc2c924..a4dc0a68062 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -53,9 +53,11 @@ import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics; import org.slf4j.Logger; +import java.text.MessageFormat; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -200,7 +202,8 @@ public class RecordCollectorImpl implements RecordCollector { try { keyBytes = keySerializer.serialize(topic, headers, key); } catch (final ClassCastException exception) { - throw createStreamsExceptionForKeyClassCastException( + throw createStreamsExceptionForClassCastException( + ProductionExceptionHandler.SerializationExceptionOrigin.KEY, topic, key, keySerializer, @@ -223,7 +226,8 @@ public class RecordCollectorImpl implements RecordCollector { try { valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { - throw createStreamsExceptionForValueClassCastException( + throw createStreamsExceptionForClassCastException( + ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, topic, value, valueSerializer, @@ -335,39 +339,27 @@ public class RecordCollectorImpl implements RecordCollector { droppedRecordsSensor.record(); } - private StreamsException createStreamsExceptionForKeyClassCastException(final String topic, - final K key, - final Serializer keySerializer, - final ClassCastException exception) { - final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); - return new StreamsException( - String.format( - "ClassCastException while producing data to topic %s. " + - "The key serializer %s is not compatible to the actual key type: %s. " + - "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - topic, - keySerializer.getClass().getName(), - keyClass), - exception); - } - private StreamsException createStreamsExceptionForValueClassCastException(final String topic, - final V value, - final Serializer valueSerializer, - final ClassCastException exception) { - final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + private StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, + final String topic, + final KV keyOrValue, + final Serializer keyOrValueSerializer, + final ClassCastException exception) { + final String keyOrValueClass = keyOrValue == null + ? String.format("unknown because %s is null", origin.toString().toLowerCase(Locale.ROOT)) : keyOrValue.getClass().getName(); + return new StreamsException( + MessageFormat.format( String.format( "ClassCastException while producing data to topic %s. " + - "The value serializer %s is not compatible to the actual value type: %s. " + - "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + "The {0} serializer %s is not compatible to the actual {0} type: %s. " + + "Change the default {0} serde in StreamConfig or provide the correct {0} serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", topic, - valueSerializer.getClass().getName(), - valueClass), + keyOrValueSerializer.getClass().getName(), + keyOrValueClass), + origin.toString().toLowerCase(Locale.ROOT)), exception); }