MINOR: Re-add pageview demo to :streams:examples and remove dependency on :connect:json (#20239)
CI / build (push) Has been cancelled Details

With 4.0 release, we remove pageview demo because it depends on
`:connect:json` which requires JDK 17.  This PR removes the connect
dependency and adds a customized serializer and deserializer,  to make
pageview demo works with JDK 11.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Jinhe Zhang 2025-07-25 14:06:12 -04:00 committed by GitHub
parent 46cb6cbcff
commit 73f195f062
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 67 additions and 7 deletions

View File

@ -47,7 +47,7 @@ ext {
gradleVersion = versions.gradle
minClientJavaVersion = 11
minNonClientJavaVersion = 17
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-util"]
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"]
buildVersionFileName = "kafka-version.properties"
@ -2917,10 +2917,10 @@ project(':streams:examples') {
}
dependencies {
// this dependency should be removed after we unify data API
implementation(project(':connect:json'))
implementation project(':streams')
implementation libs.slf4jApi
implementation libs.jacksonDatabind
implementation libs.jacksonAnnotations
testImplementation project(':streams:test-utils')
testImplementation project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest

View File

@ -17,12 +17,11 @@
package org.apache.kafka.streams.examples.pageview;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -35,10 +34,13 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
/**
@ -56,6 +58,64 @@ import java.util.Properties;
*/
public class PageViewUntypedDemo {
/**
* Custom JSON serializer for JsonNode objects using Jackson ObjectMapper.
*/
public static class JsonNodeSerializer implements Serializer<JsonNode> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// No configuration needed
}
@Override
public byte[] serialize(final String topic, final JsonNode data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (final IOException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
// No resources to close
}
}
/**
* Custom JSON deserializer for JsonNode objects using Jackson ObjectMapper.
*/
public static class JsonNodeDeserializer implements Deserializer<JsonNode> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// No configuration needed
}
@Override
public JsonNode deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
try {
return objectMapper.readTree(data);
} catch (final IOException e) {
throw new SerializationException("Error deserializing JSON message", e);
}
}
@Override
public void close() {
// No resources to close
}
}
public static void main(final String[] args) throws Exception {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
@ -68,8 +128,8 @@ public class PageViewUntypedDemo {
final StreamsBuilder builder = new StreamsBuilder();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serializer<JsonNode> jsonSerializer = new JsonNodeSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonNodeDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);