From 73f195f06257c2e42ef6a1f87f118974d7d90830 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Fri, 25 Jul 2025 14:06:12 -0400 Subject: [PATCH] MINOR: Re-add pageview demo to :streams:examples and remove dependency on :connect:json (#20239) With 4.0 release, we remove pageview demo because it depends on `:connect:json` which requires JDK 17. This PR removes the connect dependency and adds a customized serializer and deserializer, to make pageview demo works with JDK 11. Reviewers: Matthias J. Sax --- build.gradle | 6 +- .../pageview/PageViewUntypedDemo.java | 68 +++++++++++++++++-- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index f68b130f2de..5f9e81789cc 100644 --- a/build.gradle +++ b/build.gradle @@ -47,7 +47,7 @@ ext { gradleVersion = versions.gradle minClientJavaVersion = 11 minNonClientJavaVersion = 17 - modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-util"] + modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"] buildVersionFileName = "kafka-version.properties" @@ -2917,10 +2917,10 @@ project(':streams:examples') { } dependencies { - // this dependency should be removed after we unify data API - implementation(project(':connect:json')) implementation project(':streams') implementation libs.slf4jApi + implementation libs.jacksonDatabind + implementation libs.jacksonAnnotations testImplementation project(':streams:test-utils') testImplementation project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 155dc1b46b3..6225793cde1 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -17,12 +17,11 @@ package org.apache.kafka.streams.examples.pageview; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.connect.json.JsonDeserializer; -import org.apache.kafka.connect.json.JsonSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -35,10 +34,13 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; import java.time.Duration; +import java.util.Map; import java.util.Properties; /** @@ -56,6 +58,64 @@ import java.util.Properties; */ public class PageViewUntypedDemo { + /** + * Custom JSON serializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeSerializer implements Serializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(final Map configs, final boolean isKey) { + // No configuration needed + } + + @Override + public byte[] serialize(final String topic, final JsonNode data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (final IOException e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + // No resources to close + } + } + + /** + * Custom JSON deserializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeDeserializer implements Deserializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(final Map configs, final boolean isKey) { + // No configuration needed + } + + @Override + public JsonNode deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + try { + return objectMapper.readTree(data); + } catch (final IOException e) { + throw new SerializationException("Error deserializing JSON message", e); + } + } + + @Override + public void close() { + // No resources to close + } + } + public static void main(final String[] args) throws Exception { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped"); @@ -68,8 +128,8 @@ public class PageViewUntypedDemo { final StreamsBuilder builder = new StreamsBuilder(); - final Serializer jsonSerializer = new JsonSerializer(); - final Deserializer jsonDeserializer = new JsonDeserializer(); + final Serializer jsonSerializer = new JsonNodeSerializer(); + final Deserializer jsonDeserializer = new JsonNodeDeserializer(); final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); final Consumed consumed = Consumed.with(Serdes.String(), jsonSerde);