diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index dc07f6da3cb..7a5dc05b7a7 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -17,17 +17,26 @@ package kafka.examples; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; -public class Consumer extends Thread { +/** + * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic, + * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code + * numMessageToConsume} is hit or catching an exception. + */ +public class Consumer extends Thread implements ConsumerRebalanceListener { private final KafkaConsumer consumer; private final String topic; private final String groupId; @@ -48,8 +57,6 @@ public class Consumer extends Thread { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id)); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); if (readCommitted) { @@ -71,10 +78,14 @@ public class Consumer extends Thread { @Override public void run() { try { + System.out.println("Subscribe to:" + this.topic); + consumer.subscribe(Collections.singletonList(this.topic), this); do { doWork(); } while (messageRemaining > 0); System.out.println(groupId + " finished reading " + numMessageToConsume + " messages"); + } catch (WakeupException e) { + // swallow the wakeup } catch (Exception e) { System.out.println("Unexpected termination, exception thrown:" + e); } finally { @@ -82,7 +93,6 @@ public class Consumer extends Thread { } } public void doWork() { - consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); @@ -91,6 +101,17 @@ public class Consumer extends Thread { } public void shutdown() { + this.consumer.close(); latch.countDown(); } + + @Override + public void onPartitionsRevoked(Collection partitions) { + System.out.println("Revoking partitions:" + partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + System.out.println("Assigning partitions:" + partitions); + } } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index d6b6dea16aa..e649a7862c9 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -18,9 +18,9 @@ package kafka.examples; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer;