diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java new file mode 100644 index 00000000000..ec1368ed801 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This class interfaces the KafkaConsumer and the background thread. It allows the caller to enqueue {@link ApplicationEvent} + * to be consumed by the background thread and poll {@linkBackgroundEvent} produced by the background thread. + */ +public class DefaultEventHandler implements EventHandler { + private final BlockingQueue applicationEvents; + private final BlockingQueue backgroundEvents; + + public DefaultEventHandler() { + this.applicationEvents = new LinkedBlockingQueue<>(); + this.backgroundEvents = new LinkedBlockingQueue<>(); + // TODO: a concreted implementation of how requests are being consumed, and how responses are being produced. + } + + @Override + public Optional poll() { + return Optional.ofNullable(backgroundEvents.poll()); + } + + @Override + public boolean isEmpty() { + return backgroundEvents.isEmpty(); + } + + @Override + public boolean add(ApplicationEvent event) { + try { + return applicationEvents.add(event); + } catch (IllegalStateException e) { + // swallow the capacity restriction exception + return false; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java new file mode 100644 index 00000000000..0764a27faf8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.common.utils.Time; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This prototype consumer uses the EventHandler to process application events so that the network IO can be processed in a background thread. Visit + * this document + * for detail implementation. + */ +public abstract class PrototypeAsyncConsumer implements Consumer { + private final EventHandler eventHandler; + private final Time time; + + public PrototypeAsyncConsumer(final Time time, final EventHandler eventHandler) { + this.time = time; + this.eventHandler = eventHandler; + } + + /** + * poll implementation using {@link EventHandler}. + * 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is + * another type of event, process it. + * 2. Send fetches if needed. + * If the timeout expires, return an empty ConsumerRecord. + * @param timeout timeout of the poll loop + * @return ConsumerRecord. It can be empty if time timeout expires. + */ + @Override + public ConsumerRecords poll(final Duration timeout) { + try { + do { + if (!eventHandler.isEmpty()) { + Optional backgroundEvent = eventHandler.poll(); + // processEvent() may process 3 types of event: + // 1. Errors + // 2. Callback Invocation + // 3. Fetch responses + // Errors will be handled or rethrown. + // Callback invocation will trigger callback function execution, which is blocking until completion. + // Successful fetch responses will be added to the completedFetches in the fetcher, which will then + // be processed in the collectFetches(). + backgroundEvent.ifPresent(event -> processEvent(event, timeout)); + } + // The idea here is to have the background thread sending fetches autonomously, and the fetcher + // uses the poll loop to retrieve successful fetchResponse and process them on the polling thread. + final Fetch fetch = collectFetches(); + if (!fetch.isEmpty()) { + return processFetchResults(fetch); + } + // We will wait for retryBackoffMs + } while (time.timer(timeout).notExpired()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return ConsumerRecords.empty(); + } + + abstract void processEvent(BackgroundEvent backgroundEvent, Duration timeout); + abstract ConsumerRecords processFetchResults(Fetch fetch); + abstract Fetch collectFetches(); + + /** + * This method sends a commit event to the EventHandler and return. + */ + @Override + public void commitAsync() { + ApplicationEvent commitEvent = new CommitApplicationEvent(); + eventHandler.add(commitEvent); + } + + /** + * This method sends a commit event to the EventHandler and waits for the event to finish. + * @param timeout max wait time for the blocking operation. + */ + @Override + public void commitSync(Duration timeout) { + CommitApplicationEvent commitEvent = new CommitApplicationEvent(); + eventHandler.add(commitEvent); + + CompletableFuture commitFuture = commitEvent.commitFuture; + try { + commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new org.apache.kafka.common.errors.TimeoutException("timeout"); + } catch (Exception e) { + // handle exception here + throw new RuntimeException(e); + } + } + + /** + * A stubbed ApplicationEvent for demonstration purpose + */ + private class CommitApplicationEvent extends ApplicationEvent { + // this is the stubbed commitAsyncEvents + CompletableFuture commitFuture = new CompletableFuture<>(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java new file mode 100644 index 00000000000..0b2e5a901d5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +/** + * This is the abstract definition of the events created by the KafkaConsumer API + */ +abstract public class ApplicationEvent { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java new file mode 100644 index 00000000000..8870e179d87 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +/** + * This is the abstract definition of the events created by the background thread. + */ +abstract public class BackgroundEvent { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java new file mode 100644 index 00000000000..d9f5b9d065b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +import java.util.Optional; + +/** + * This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via + * the {@code add()} method and to retrieve events via the {@code poll()} method. + */ +public interface EventHandler { + /** + * Retrieves and removes a {@link BackgroundEvent}. Returns an empty Optional instance if there is nothing. + * @return an Optional of {@link BackgroundEvent} if the value is present. Otherwise, an empty Optional. + */ + Optional poll(); + + /** + * Check whether there are pending {@code BackgroundEvent} await to be consumed. + * @return true if there are no pending event + */ + boolean isEmpty(); + + /** + * Add an {@link ApplicationEvent} to the handler. The method returns true upon successful add; otherwise returns + * false. + * @param event An {@link ApplicationEvent} created by the polling thread. + * @return true upon successful add. + */ + boolean add(ApplicationEvent event); +}