KAFKA-17368 Add delivery count to kafka-console-share-consumer.sh (#16925)

Now that ConsumerRecord.deliveryCount() exists, enhance kafka-console-share-consumer.sh to exploit it. Added support to the DefaultMessageFormatter and the option print.delivery to the usage message for kafka-console-share-consumer.sh. Note that it was not added to kafka-console-consumer.sh even though the option would be recognised - this is because delivery with a consumer group does not count deliveries, and the result would include Delivery:NOT_PRESENT for all records if it was enabled - not really that useful with a consumer group.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Andrew Schofield 2024-08-19 23:19:36 +01:00 committed by GitHub
parent 3d868aacf2
commit 34475070e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 28 additions and 5 deletions

View File

@ -75,6 +75,7 @@ public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
" print.timestamp=true|false\n" + " print.timestamp=true|false\n" +
" print.key=true|false\n" + " print.key=true|false\n" +
" print.offset=true|false\n" + " print.offset=true|false\n" +
" print.delivery=true|false\n" +
" print.partition=true|false\n" + " print.partition=true|false\n" +
" print.headers=true|false\n" + " print.headers=true|false\n" +
" print.value=true|false\n" + " print.value=true|false\n" +

View File

@ -42,6 +42,7 @@ class DefaultMessageFormatter implements MessageFormatter {
private boolean printValue = true; private boolean printValue = true;
private boolean printPartition = false; private boolean printPartition = false;
private boolean printOffset = false; private boolean printOffset = false;
private boolean printDelivery = false;
private boolean printHeaders = false; private boolean printHeaders = false;
private byte[] keySeparator = utfBytes("\t"); private byte[] keySeparator = utfBytes("\t");
private byte[] lineSeparator = utfBytes("\n"); private byte[] lineSeparator = utfBytes("\n");
@ -63,6 +64,9 @@ class DefaultMessageFormatter implements MessageFormatter {
if (configs.containsKey("print.offset")) { if (configs.containsKey("print.offset")) {
printOffset = getBoolProperty(configs, "print.offset"); printOffset = getBoolProperty(configs, "print.offset");
} }
if (configs.containsKey("print.delivery")) {
printDelivery = getBoolProperty(configs, "print.delivery");
}
if (configs.containsKey("print.partition")) { if (configs.containsKey("print.partition")) {
printPartition = getBoolProperty(configs, "print.partition"); printPartition = getBoolProperty(configs, "print.partition");
} }
@ -126,18 +130,24 @@ class DefaultMessageFormatter implements MessageFormatter {
} else { } else {
output.print("NO_TIMESTAMP"); output.print("NO_TIMESTAMP");
} }
writeSeparator(output, printOffset || printPartition || printHeaders || printKey || printValue); writeSeparator(output, printPartition || printOffset || printDelivery || printHeaders || printKey || printValue);
} }
if (printPartition) { if (printPartition) {
output.print("Partition:"); output.print("Partition:");
output.print(consumerRecord.partition()); output.print(consumerRecord.partition());
writeSeparator(output, printOffset || printHeaders || printKey || printValue); writeSeparator(output, printOffset || printDelivery || printHeaders || printKey || printValue);
} }
if (printOffset) { if (printOffset) {
output.print("Offset:"); output.print("Offset:");
output.print(consumerRecord.offset()); output.print(consumerRecord.offset());
writeSeparator(output, printDelivery || printHeaders || printKey || printValue);
}
if (printDelivery) {
output.print("Delivery:");
output.print(consumerRecord.deliveryCount().map(delivery -> Short.toString(delivery)).orElse("NOT_PRESENT"));
writeSeparator(output, printHeaders || printKey || printValue); writeSeparator(output, printHeaders || printKey || printValue);
} }

View File

@ -77,25 +77,37 @@ public class DefaultMessageFormatterTest {
formatter.writeTo(record, new PrintStream(out)); formatter.writeTo(record, new PrintStream(out));
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tNO_HEADERS\tkey\tvalue\n", out.toString()); assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tNO_HEADERS\tkey\tvalue\n", out.toString());
configs.put("print.delivery", "true");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tDelivery:NOT_PRESENT\tNO_HEADERS\tkey\tvalue\n", out.toString());
RecordHeaders headers = new RecordHeaders(); RecordHeaders headers = new RecordHeaders();
headers.add("h1", "v1".getBytes()); headers.add("h1", "v1".getBytes());
headers.add("h2", "v2".getBytes()); headers.add("h2", "v2".getBytes());
record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(), record = new ConsumerRecord<>("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes(), "value".getBytes(),
headers, Optional.empty()); headers, Optional.empty(), Optional.of((short) 1));
out = new ByteArrayOutputStream(); out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out)); formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\tvalue\n", out.toString()); assertEquals("CreateTime:123\tPartition:0\tOffset:123\tDelivery:1\th1:v1,h2:v2\tkey\tvalue\n", out.toString());
configs.put("print.value", "false"); configs.put("print.value", "false");
formatter.configure(configs); formatter.configure(configs);
out = new ByteArrayOutputStream(); out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out)); formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123\tPartition:0\tOffset:123\th1:v1,h2:v2\tkey\n", out.toString()); assertEquals("CreateTime:123\tPartition:0\tOffset:123\tDelivery:1\th1:v1,h2:v2\tkey\n", out.toString());
configs.put("key.separator", "<sep>"); configs.put("key.separator", "<sep>");
formatter.configure(configs); formatter.configure(configs);
out = new ByteArrayOutputStream(); out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out)); formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>Delivery:1<sep>h1:v1,h2:v2<sep>key\n", out.toString());
configs.put("print.delivery", "false");
formatter.configure(configs);
out = new ByteArrayOutputStream();
formatter.writeTo(record, new PrintStream(out));
assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:v1,h2:v2<sep>key\n", out.toString()); assertEquals("CreateTime:123<sep>Partition:0<sep>Offset:123<sep>h1:v1,h2:v2<sep>key\n", out.toString());
configs.put("line.separator", "<end>"); configs.put("line.separator", "<end>");