KAFKA-5815; add Printed class and KStream#print(printed)

Part of KIP-182
- Add `Printed` class and `KStream#print(Printed)`
- deprecate all other `print` and `writeAsText` methods

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3768 from dguy/kafka-5652-printed
This commit is contained in:
Damian Guy 2017-09-08 18:22:04 +01:00
parent e16b9143df
commit 4769e3d92a
12 changed files with 365 additions and 84 deletions

View File

@ -1016,10 +1016,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<pre class="brush: java;">
KStream&lt;byte[], String&gt; stream = ...;
stream.print();
// You can also override how and where the data is printed, i.e, to file:
stream.print(Printed.toFile("stream.out"));
// Several variants of `print` exist to e.g. override the default serdes for record keys
// and record values, set a prefix label for the output string, etc
stream.print(Serdes.ByteArray(), Serdes.String());
// with a custom KeyValueMapper and label
stream.print(Printed.toSysOut()
.withLabel("my-stream")
.withKeyValueMapper((key, value) -> key + " -> " + value));
</pre>
</td>
</tr>

View File

@ -85,6 +85,12 @@
An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
</p>
<p>
With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
</p>
<p>
Windowed aggregations have moved from <code>KGroupedStream</code> to <code>WindowedKStream</code>.
You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.

View File

@ -274,7 +274,9 @@ public interface KStream<K, V> {
* <p>
* Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
* {@link Integer} etc. to get meaningful information.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print();
/**
@ -288,7 +290,9 @@ public interface KStream<K, V> {
* {@link Integer} etc. to get meaningful information.
*
* @param label the name used to label the key/value pairs printed to the console
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final String label);
/**
@ -304,7 +308,9 @@ public interface KStream<K, V> {
*
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final Serde<K> keySerde,
final Serde<V> valSerde);
@ -320,7 +326,9 @@ public interface KStream<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
* @param label the name used to label the key/value pairs printed to the console
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final Serde<K> keySerde,
final Serde<V> valSerde,
final String label);
@ -344,7 +352,9 @@ public interface KStream<K, V> {
* The KeyValueMapper's mapped value type must be {@code String}.
*
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper);
/**
@ -367,7 +377,9 @@ public interface KStream<K, V> {
*
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @param label The given name which labels output will be printed.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label);
/**
@ -394,7 +406,9 @@ public interface KStream<K, V> {
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
* @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde);
/**
@ -422,9 +436,17 @@ public interface KStream<K, V> {
* @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
* @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
* @param label The given name which labels output will be printed.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label);
/**
* Print the records of this KStream using the options provided by {@link Printed}
* @param printed options for printing
*/
void print(final Printed<K, V> printed);
/**
* Write the records of this stream to a file at the given path.
* This function will use the generated name of the parent processor node to label the key/value pairs printed to
@ -437,7 +459,9 @@ public interface KStream<K, V> {
* {@link Integer} etc. to get meaningful information.
*
* @param filePath name of the file to write to
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath);
/**
@ -452,7 +476,9 @@ public interface KStream<K, V> {
*
* @param filePath name of the file to write to
* @param label the name used to label the key/value pairs written to the file
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath,
final String label);
@ -470,7 +496,9 @@ public interface KStream<K, V> {
* @param filePath name of the file to write to
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath,
final Serde<K> keySerde,
final Serde<V> valSerde);
@ -489,7 +517,9 @@ public interface KStream<K, V> {
* @param label the name used to label the key/value pairs written to the file
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used deserialize value if type is {@code byte[]},
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath,
final String label,
final Serde<K> keySerde,
@ -517,7 +547,9 @@ public interface KStream<K, V> {
*
* @param filePath path of the file to write to.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper);
/**
@ -543,7 +575,9 @@ public interface KStream<K, V> {
* @param filePath path of the file to write to.
* @param label the name used to label records written to file.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper);
/**
@ -573,7 +607,9 @@ public interface KStream<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]}.
* @param valSerde value serde used to deserialize value if type is {@code byte[]}.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @deprecated use {@code print(Printed)}
*/
@Deprecated
void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
/**
@ -604,6 +640,7 @@ public interface KStream<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]}.
* @param valSerde value serde used to deserialize value if type is {@code byte[]}.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @deprecated use {@code print(Printed)}
*/
void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* An object to define the options used when printing a {@link KStream}.
*
* @param <K> key type
* @param <V> value type
* @see KStream#print(Printed)
*/
public class Printed<K, V> {
protected final PrintWriter printWriter;
protected String label;
protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() {
@Override
public String apply(final K key, final V value) {
return String.format("%s, %s", key, value);
}
};
private Printed(final PrintWriter printWriter) {
this.printWriter = printWriter;
}
/**
* Copy constructor.
* @param printed instance of {@link Printed} to copy
*/
public Printed(final Printed<K, V> printed) {
this.printWriter = printed.printWriter;
this.label = printed.label;
this.mapper = printed.mapper;
}
/**
* Print the records of a {@link KStream} to a file.
*
* @param filePath path of the file
* @param <K> key type
* @param <V> value type
* @return a new Printed instance
*/
public static <K, V> Printed<K, V> toFile(final String filePath) {
Objects.requireNonNull(filePath, "filePath can't be null");
if (filePath.trim().isEmpty()) {
throw new TopologyException("filePath can't be an empty string");
}
try {
return new Printed<>(new PrintWriter(filePath, StandardCharsets.UTF_8.name()));
} catch (final FileNotFoundException | UnsupportedEncodingException e) {
throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
}
}
/**
* Print the records of a {@link KStream} to system out.
*
* @param <K> key type
* @param <V> value type
* @return a new Printed instance
*/
public static <K, V> Printed<K, V> toSysOut() {
return new Printed<>((PrintWriter) null);
}
/**
* Print the records of a {@link KStream} with the provided label.
*
* @param label label to use
* @return this
*/
public Printed<K, V> withLabel(final String label) {
Objects.requireNonNull(label, "label can't be null");
this.label = label;
return this;
}
/**
* Print the records of a {@link KStream} with the provided {@link KeyValueMapper}
* The provided KeyValueMapper's mapped value type must be {@code String}.
* <p>
* The example below shows how to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
* @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
* };
* }</pre>
*
* Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
* {@link Integer} etc. to get meaningful information.
*
* @param mapper mapper to use
* @return this
*/
public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
this.mapper = mapper;
return this;
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
@ -30,7 +29,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.PrintForeachAction;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TransformerSupplier;
@ -43,11 +42,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
@ -236,8 +231,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String label) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(label, "label can't be null");
String name = builder.newName(PRINTING_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(null, mapper, label), keySerde, valSerde), this.name);
print(Printed.<K, V>toSysOut().withLabel(label).withKeyValueMapper(mapper));
}
@Override
public void print(final Printed<K, V> printed) {
Objects.requireNonNull(printed, "printed can't be null");
final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
final String name = builder.newName(PRINTING_NAME);
builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
}
@Override
@ -295,16 +297,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Objects.requireNonNull(filePath, "filePath can't be null");
Objects.requireNonNull(label, "label can't be null");
Objects.requireNonNull(mapper, "mapper can't be null");
if (filePath.trim().isEmpty()) {
throw new TopologyException("filePath can't be an empty string");
}
final String name = builder.newName(PRINTING_NAME);
try {
PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label), keySerde, valSerde), this.name);
} catch (FileNotFoundException | UnsupportedEncodingException e) {
throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
}
print(Printed.<K, V>toFile(filePath).withKeyValueMapper(mapper).withLabel(label));
}
@Override

View File

@ -17,67 +17,30 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.PrintForeachAction;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
private final Serde<?> keySerde;
private final Serde<?> valueSerde;
private final ForeachAction<K, V> action;
public KStreamPrint(final ForeachAction<K, V> action, final Serde<?> keySerde, final Serde<?> valueSerde) {
public KStreamPrint(final ForeachAction<K, V> action) {
this.action = action;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@Override
public Processor<K, V> get() {
return new KStreamPrintProcessor(keySerde, valueSerde);
return new KStreamPrintProcessor();
}
private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
private Serde<?> keySerde;
private Serde<?> valueSerde;
private ProcessorContext context;
public KStreamPrintProcessor(final Serde<?> keySerde, final Serde<?> valueSerde) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
if (keySerde == null) {
this.keySerde = context.keySerde();
}
if (valueSerde == null) {
this.valueSerde = context.valueSerde();
}
}
@Override
public void process(final K key, final V value) {
final K deKey = (K) maybeDeserialize(key, keySerde.deserializer());
final V deValue = (V) maybeDeserialize(value, valueSerde.deserializer());
action.apply(deKey, deValue);
action.apply(key, value);
}
private Object maybeDeserialize(final Object keyOrValue, final Deserializer<?> deserializer) {
if (keyOrValue instanceof byte[]) {
return deserializer.deserialize(this.context.topic(), (byte[]) keyOrValue);
}
return keyOrValue;
}
@Override
public void close() {
if (action instanceof PrintForeachAction) {

View File

@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.PrintForeachAction;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -269,7 +268,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final String label) {
Objects.requireNonNull(label, "label can't be null");
final String name = builder.newName(PRINTING_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name);
}
@SuppressWarnings("deprecation")
@ -310,7 +309,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
String name = builder.newName(PRINTING_NAME);
try {
PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
} catch (final FileNotFoundException | UnsupportedEncodingException e) {
throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage()));
}

View File

@ -14,7 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import java.io.PrintWriter;

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public class PrintedInternal<K, V> extends Printed<K, V> {
public PrintedInternal(final Printed<K, V> printed) {
super(printed);
}
/**
* Builds the {@link ProcessorSupplier} that will be used to print the records flowing through a {@link KStream}.
*
* @return the {@code ProcessorSupplier} to be used for printing
*/
public ProcessorSupplier<K, V> build(final String processorName) {
return new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label != null ? label : processorName));
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
public class PrintedTest {
private final ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
private final Printed<String, Integer> sysOutPrinter = Printed.toSysOut();
@Before
public void before() {
System.setOut(new PrintStream(sysOut));
}
@After
public void after() {
System.setOut(null);
}
@Test
public void shouldCreateProcessorThatPrintsToFile() throws IOException {
final File file = TestUtils.tempFile();
final ProcessorSupplier<String, Integer> processorSupplier = new PrintedInternal<>(
Printed.<String, Integer>toFile(file.getPath()))
.build("processor");
final Processor<String, Integer> processor = processorSupplier.get();
processor.process("hi", 1);
processor.close();
try (final FileInputStream stream = new FileInputStream(file)) {
final byte[] data = new byte[stream.available()];
stream.read(data);
assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]: hi, 1\n"));
}
}
@Test
public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException {
final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
supplier.get().process("good", 2);
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n"));
}
@Test
public void shouldPrintWithLabel() throws UnsupportedEncodingException {
final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withLabel("label"))
.build("processor")
.get();
processor.process("hello", 3);
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n"));
}
@Test
public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException {
final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withKeyValueMapper(
new KeyValueMapper<String, Integer, String>() {
@Override
public String apply(final String key, final Integer value) {
return String.format("%s -> %d", key, value);
}
})).build("processor")
.get();
processor.process("hello", 1);
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n"));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionIfFilePathIsNull() {
Printed.toFile(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionIfMapperIsNull() {
sysOutPrinter.withKeyValueMapper(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionIfLabelIsNull() {
sysOutPrinter.withLabel(null);
}
@Test(expected = TopologyException.class)
public void shouldThrowTopologyExceptionIfFilePathIsEmpty() {
Printed.toFile("");
}
@Test(expected = TopologyException.class)
public void shouldThrowTopologyExceptionIfFilePathDoesntExist() {
Printed.toFile("/this/should/not/exist");
}
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -415,6 +416,11 @@ public class KStreamImplTest {
null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
testStream.print((Printed) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
@ -426,7 +432,6 @@ public class KStreamImplTest {
testStream.to("topic", null);
}
@Test
public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
final KTable<String, String> table = builder.table("blah", consumed);

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.PrintForeachAction;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -61,7 +60,7 @@ public class KStreamPrintTest {
}
};
kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd);
kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"));
printProcessor = kStreamPrint.get();
ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
@ -85,22 +84,6 @@ public class KStreamPrintTest {
doTest(inputRecords, expectedResult);
}
@Test
@SuppressWarnings("unchecked")
public void testPrintKeyValueStringBytesArray() {
// we don't have a topic name because we don't need it for the test at this level
final List<KeyValue<byte[], byte[]>> inputRecords = Arrays.asList(
new KeyValue<>(intSerd.serializer().serialize(null, 0), stringSerd.serializer().serialize(null, "zero")),
new KeyValue<>(intSerd.serializer().serialize(null, 1), stringSerd.serializer().serialize(null, "one")),
new KeyValue<>(intSerd.serializer().serialize(null, 2), stringSerd.serializer().serialize(null, "two")),
new KeyValue<>(intSerd.serializer().serialize(null, 3), stringSerd.serializer().serialize(null, "three")));
final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one", "[test-stream]: 2, two", "[test-stream]: 3, three"};
doTest(inputRecords, expectedResult);
}
private void assertFlushData(final String[] expectedResult, final ByteArrayOutputStream byteOutStream) {
final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");