diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java index 0adb8dbbe5a..3ec67ec8e87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java @@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.internals.SerdeGetter; */ public class WrappingNullableUtils { - @SuppressWarnings({"unchecked", "rawtypes"}) - private static Deserializer prepareDeserializer(final Deserializer specificDeserializer, final ProcessorContext context, final boolean isKey, final String name) { + @SuppressWarnings({"unchecked", "resource"}) + private static Deserializer prepareDeserializer(final Deserializer specificDeserializer, final ProcessorContext context, final boolean isKey) { final Deserializer deserializerToUse; if (specificDeserializer == null) { @@ -41,8 +41,8 @@ public class WrappingNullableUtils { return deserializerToUse; } - @SuppressWarnings({"unchecked", "rawtypes"}) - private static Serializer prepareSerializer(final Serializer specificSerializer, final ProcessorContext context, final boolean isKey, final String name) { + @SuppressWarnings({"unchecked", "resource"}) + private static Serializer prepareSerializer(final Serializer specificSerializer, final ProcessorContext context, final boolean isKey) { final Serializer serializerToUse; if (specificSerializer == null) { serializerToUse = (Serializer) (isKey ? context.keySerde().serializer() : context.valueSerde().serializer()); @@ -53,7 +53,7 @@ public class WrappingNullableUtils { return serializerToUse; } - @SuppressWarnings({"rawtypes", "unchecked"}) + @SuppressWarnings("unchecked") private static Serde prepareSerde(final Serde specificSerde, final SerdeGetter getter, final boolean isKey) { final Serde serdeToUse; if (specificSerde == null) { @@ -62,25 +62,25 @@ public class WrappingNullableUtils { serdeToUse = specificSerde; } if (serdeToUse instanceof WrappingNullableSerde) { - ((WrappingNullableSerde) serdeToUse).setIfUnset(getter); + ((WrappingNullableSerde) serdeToUse).setIfUnset(getter); } return serdeToUse; } - public static Deserializer prepareKeyDeserializer(final Deserializer specificDeserializer, final ProcessorContext context, final String name) { - return prepareDeserializer(specificDeserializer, context, true, name); + public static Deserializer prepareKeyDeserializer(final Deserializer specificDeserializer, final ProcessorContext context) { + return prepareDeserializer(specificDeserializer, context, true); } - public static Deserializer prepareValueDeserializer(final Deserializer specificDeserializer, final ProcessorContext context, final String name) { - return prepareDeserializer(specificDeserializer, context, false, name); + public static Deserializer prepareValueDeserializer(final Deserializer specificDeserializer, final ProcessorContext context) { + return prepareDeserializer(specificDeserializer, context, false); } - public static Serializer prepareKeySerializer(final Serializer specificSerializer, final ProcessorContext context, final String name) { - return prepareSerializer(specificSerializer, context, true, name); + public static Serializer prepareKeySerializer(final Serializer specificSerializer, final ProcessorContext context) { + return prepareSerializer(specificSerializer, context, true); } - public static Serializer prepareValueSerializer(final Serializer specificSerializer, final ProcessorContext context, final String name) { - return prepareSerializer(specificSerializer, context, false, name); + public static Serializer prepareValueSerializer(final Serializer specificSerializer, final ProcessorContext context) { + return prepareSerializer(specificSerializer, context, false); } public static Serde prepareKeySerde(final Serde specificSerde, final SerdeGetter getter) { @@ -91,17 +91,15 @@ public class WrappingNullableUtils { return prepareSerde(specificSerde, getter, false); } - @SuppressWarnings({"rawtypes", "unchecked"}) public static void initNullableSerializer(final Serializer specificSerializer, final SerdeGetter getter) { if (specificSerializer instanceof WrappingNullableSerializer) { - ((WrappingNullableSerializer) specificSerializer).setIfUnset(getter); + ((WrappingNullableSerializer) specificSerializer).setIfUnset(getter); } } - @SuppressWarnings({"rawtypes", "unchecked"}) public static void initNullableDeserializer(final Deserializer specificDeserializer, final SerdeGetter getter) { if (specificDeserializer instanceof WrappingNullableDeserializer) { - ((WrappingNullableDeserializer) specificDeserializer).setIfUnset(getter); + ((WrappingNullableDeserializer) specificDeserializer).setIfUnset(getter); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 779ec3a917a..d32cf2523e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -56,19 +56,18 @@ public class SinkNode extends ProcessorNode { throw new UnsupportedOperationException("sink node does not allow addChild"); } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override public void init(final InternalProcessorContext context) { super.init(context); this.context = context; try { - keySerializer = prepareKeySerializer(keySerializer, (InternalProcessorContext) context, this.name()); + keySerializer = prepareKeySerializer(keySerializer, context); } catch (ConfigException | StreamsException e) { throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e, context.taskId()); } try { - valSerializer = prepareValueSerializer(valSerializer, (InternalProcessorContext) context, this.name()); + valSerializer = prepareValueSerializer(valSerializer, context); } catch (final ConfigException | StreamsException e) { throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e, context.taskId()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index f45e9e293fd..b76a134b744 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -77,13 +77,13 @@ public class SourceNode extends ProcessorNode { this.context = context; try { - keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); + keyDeserializer = prepareKeyDeserializer(keyDeserializer, context); } catch (final ConfigException | StreamsException e) { throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e, context.taskId()); } try { - valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); + valDeserializer = prepareValueDeserializer(valDeserializer, context); } catch (final ConfigException | StreamsException e) { throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e, context.taskId()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 4f8db686383..39516519b1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -48,8 +48,8 @@ public class SinkNodeTest { new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null); // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed - @SuppressWarnings({"unchecked", "rawtypes"}) - private final SinkNode illTypedSink = (SinkNode) sink; + @SuppressWarnings("unchecked") + private final SinkNode illTypedSink = (SinkNode) ((SinkNode) sink); private MockedStatic utilsMock; @BeforeEach @@ -77,7 +77,7 @@ public class SinkNodeTest { @Test public void shouldThrowStreamsExceptionOnUndefinedKeySerde() { - utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())) + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); @@ -94,7 +94,7 @@ public class SinkNodeTest { @Test public void shouldThrowStreamsExceptionOnUndefinedValueSerde() { - utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any())) + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); @@ -111,7 +111,7 @@ public class SinkNodeTest { @Test public void shouldThrowStreamsExceptionWithExplicitErrorMessage() { - utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new StreamsException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any())).thenThrow(new StreamsException("")); final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index e0fa79fd450..a509d14c974 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -137,7 +137,7 @@ public class SourceNodeTest { final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); - utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())) + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); @@ -159,7 +159,7 @@ public class SourceNodeTest { final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); - utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any())) + utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); @@ -181,7 +181,7 @@ public class SourceNodeTest { final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); - utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())).thenThrow(new StreamsException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any())).thenThrow(new StreamsException("")); final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context));