() {
+ @Override
+ public void onSuccess(ClientResponse resp) {
+ successHandler.handle(fetchTarget, data, resp);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ errorHandler.handle(fetchTarget, data, e);
+ }
+ });
+
+ requestFutures.add(responseFuture);
+ }
+
+ return requestFutures;
+ }
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 2e2ed9f2ab4..2172313054d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@@ -58,6 +60,7 @@ import java.util.Collections;
* See {@link HeartbeatRequestState} for more details.
*/
public class HeartbeatRequestManager implements RequestManager {
+
private final Logger logger;
private final Time time;
@@ -90,22 +93,22 @@ public class HeartbeatRequestManager implements RequestManager {
/**
* ErrorEventHandler allows the background thread to propagate errors back to the user
*/
- private final ErrorEventHandler nonRetriableErrorHandler;
+ private final BackgroundEventHandler backgroundEventHandler;
public HeartbeatRequestManager(
- final Time time,
final LogContext logContext,
+ final Time time,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final SubscriptionState subscriptions,
final MembershipManager membershipManager,
- final ErrorEventHandler nonRetriableErrorHandler) {
+ final BackgroundEventHandler backgroundEventHandler) {
this.coordinatorRequestManager = coordinatorRequestManager;
this.time = time;
this.logger = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.membershipManager = membershipManager;
- this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+ this.backgroundEventHandler = backgroundEventHandler;
this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -122,7 +125,7 @@ public class HeartbeatRequestManager implements RequestManager {
final SubscriptionState subscriptions,
final MembershipManager membershipManager,
final HeartbeatRequestState heartbeatRequestState,
- final ErrorEventHandler nonRetriableErrorHandler) {
+ final BackgroundEventHandler backgroundEventHandler) {
this.logger = logContext.logger(this.getClass());
this.time = time;
this.subscriptions = subscriptions;
@@ -130,7 +133,7 @@ public class HeartbeatRequestManager implements RequestManager {
this.coordinatorRequestManager = coordinatorRequestManager;
this.heartbeatRequestState = heartbeatRequestState;
this.membershipManager = membershipManager;
- this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+ this.backgroundEventHandler = backgroundEventHandler;
}
/**
@@ -146,18 +149,14 @@ public class HeartbeatRequestManager implements RequestManager {
*/
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat()) {
- return new NetworkClientDelegate.PollResult(
- Long.MAX_VALUE, Collections.emptyList());
- }
+ if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat())
+ return NetworkClientDelegate.PollResult.EMPTY;
// TODO: We will need to send a heartbeat response after partitions being revoke. This needs to be
// implemented either with or after the partition reconciliation logic.
- if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
- return new NetworkClientDelegate.PollResult(
- heartbeatRequestState.nextHeartbeatMs(currentTimeMs),
- Collections.emptyList());
- }
+ if (!heartbeatRequestState.canSendRequest(currentTimeMs))
+ return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
+
this.heartbeatRequestState.onSendAttempt(currentTimeMs);
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest();
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
@@ -198,7 +197,7 @@ public class HeartbeatRequestManager implements RequestManager {
this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
if (exception instanceof RetriableException) {
String message = String.format("GroupHeartbeatRequest failed because of the retriable exception. " +
- "Will retry in %s ms: {}",
+ "Will retry in %s ms: %s",
heartbeatRequestState.remainingBackoffMs(responseTimeMs),
exception.getMessage());
logger.debug(message);
@@ -223,12 +222,13 @@ public class HeartbeatRequestManager implements RequestManager {
final long currentTimeMs) {
Errors error = Errors.forCode(response.data().errorCode());
String errorMessage = response.data().errorMessage();
+ String message;
// TODO: upon encountering a fatal/fenced error, trigger onPartitionLost logic to give up the current
// assignments.
switch (error) {
case NOT_COORDINATOR:
// the manager should retry immediately when the coordinator node becomes available again
- String message = String.format("GroupHeartbeatRequest failed because the group coordinator %s is incorrect. " +
+ message = String.format("GroupHeartbeatRequest failed because the group coordinator %s is incorrect. " +
"Will attempt to find the coordinator again and retry",
coordinatorRequestManager.coordinator());
logInfo(message, response, currentTimeMs);
@@ -274,7 +274,7 @@ public class HeartbeatRequestManager implements RequestManager {
break;
case FENCED_MEMBER_EPOCH:
- message = String.format("GroupHeartbeatRequest failed because member epoch %s is invalid. " +
+ message = String.format("GroupHeartbeatRequest failed because member ID %s with epoch %s is invalid. " +
"Will abandon all partitions and rejoin the group",
membershipManager.memberId(), membershipManager.memberEpoch());
logInfo(message, response, currentTimeMs);
@@ -282,7 +282,7 @@ public class HeartbeatRequestManager implements RequestManager {
break;
case UNKNOWN_MEMBER_ID:
- message = String.format("GroupHeartbeatRequest failed because member id %s is invalid. " +
+ message = String.format("GroupHeartbeatRequest failed because member of unknown ID %s with epoch %s is invalid. " +
"Will abandon all partitions and rejoin the group",
membershipManager.memberId(), membershipManager.memberEpoch());
logInfo(message, response, currentTimeMs);
@@ -307,7 +307,7 @@ public class HeartbeatRequestManager implements RequestManager {
}
private void handleFatalFailure(Throwable error) {
- nonRetriableErrorHandler.handle(error);
+ backgroundEventHandler.add(new ErrorBackgroundEvent(error));
membershipManager.transitionToFailed();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 2e61b0788f8..47485ed46ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
@@ -26,6 +28,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -35,27 +38,29 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
/**
* A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations.
*/
public class NetworkClientDelegate implements AutoCloseable {
+
private final KafkaClient client;
private final Time time;
private final Logger log;
private final int requestTimeoutMs;
private final Queue unsentRequests;
private final long retryBackoffMs;
- private final Set tryConnectNodes;
public NetworkClientDelegate(
final Time time,
@@ -68,9 +73,40 @@ public class NetworkClientDelegate implements AutoCloseable {
this.unsentRequests = new ArrayDeque<>();
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
- this.tryConnectNodes = new HashSet<>();
}
+ // Visible for testing
+ Queue unsentRequests() {
+ return unsentRequests;
+ }
+
+ /**
+ * Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in
+ * reconnect backoff window following the disconnect).
+ *
+ * @param node {@link Node} to check for availability
+ * @see NetworkClientUtils#isUnavailable(KafkaClient, Node, Time)
+ */
+ public boolean isUnavailable(Node node) {
+ return NetworkClientUtils.isUnavailable(client, node, time);
+ }
+
+ /**
+ * Checks for an authentication error on a given node and throws the exception if it exists.
+ *
+ * @param node {@link Node} to check for a previous {@link AuthenticationException}; if found it is thrown
+ * @see NetworkClientUtils#maybeThrowAuthFailure(KafkaClient, Node)
+ */
+ public void maybeThrowAuthFailure(Node node) {
+ NetworkClientUtils.maybeThrowAuthFailure(client, node);
+ }
+
+ /**
+ * Initiate a connection if currently possible. This is only really useful for resetting
+ * the failed status of a socket.
+ *
+ * @param node The node to connect to
+ */
public void tryConnect(Node node) {
NetworkClientUtils.tryConnect(client, node, time);
}
@@ -81,7 +117,6 @@ public class NetworkClientDelegate implements AutoCloseable {
*
* @param timeoutMs timeout time
* @param currentTimeMs current time
- * @return a list of client response
*/
public void poll(final long timeoutMs, final long currentTimeMs) {
trySend(currentTimeMs);
@@ -119,8 +154,7 @@ public class NetworkClientDelegate implements AutoCloseable {
}
}
- private boolean doSend(final UnsentRequest r,
- final long currentTimeMs) {
+ boolean doSend(final UnsentRequest r, final long currentTimeMs) {
Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
if (node == null || nodeUnavailable(node)) {
log.debug("No broker available to send the request: {}. Retrying.", r);
@@ -137,7 +171,7 @@ public class NetworkClientDelegate implements AutoCloseable {
return true;
}
- private void checkDisconnects(final long currentTimeMs) {
+ protected void checkDisconnects(final long currentTimeMs) {
// Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected.
Iterator iter = unsentRequests.iterator();
while (iter.hasNext()) {
@@ -190,26 +224,47 @@ public class NetworkClientDelegate implements AutoCloseable {
this.client.close();
}
+ public long addAll(PollResult pollResult) {
+ addAll(pollResult.unsentRequests);
+ return pollResult.timeUntilNextPollMs;
+ }
+
public void addAll(final List requests) {
- requests.forEach(u -> {
- u.setTimer(this.time, this.requestTimeoutMs);
- });
- this.unsentRequests.addAll(requests);
+ if (!requests.isEmpty()) {
+ requests.forEach(ur -> ur.setTimer(time, requestTimeoutMs));
+ unsentRequests.addAll(requests);
+ }
}
public static class PollResult {
+
+ public static final long WAIT_FOREVER = Long.MAX_VALUE;
+ public static final PollResult EMPTY = new PollResult(WAIT_FOREVER);
public final long timeUntilNextPollMs;
public final List unsentRequests;
- public PollResult(final long timeMsTillNextPoll, final List unsentRequests) {
- this.timeUntilNextPollMs = timeMsTillNextPoll;
+ public PollResult(final long timeUntilNextPollMs, final List unsentRequests) {
+ this.timeUntilNextPollMs = timeUntilNextPollMs;
this.unsentRequests = Collections.unmodifiableList(unsentRequests);
}
+
+ public PollResult(final List unsentRequests) {
+ this(WAIT_FOREVER, unsentRequests);
+ }
+
+ public PollResult(final UnsentRequest unsentRequest) {
+ this(Collections.singletonList(unsentRequest));
+ }
+
+ public PollResult(final long timeUntilNextPollMs) {
+ this(timeUntilNextPollMs, Collections.emptyList());
+ }
}
+
public static class UnsentRequest {
private final AbstractRequest.Builder> requestBuilder;
private final FutureCompletionHandler handler;
- private Optional node; // empty if random node can be chosen
+ private final Optional node; // empty if random node can be chosen
private Timer timer;
public UnsentRequest(final AbstractRequest.Builder> requestBuilder,
@@ -227,6 +282,12 @@ public class NetworkClientDelegate implements AutoCloseable {
this.handler.future().whenComplete(callback);
}
+ public UnsentRequest(final AbstractRequest.Builder> requestBuilder,
+ final Node node,
+ final BiConsumer callback) {
+ this(requestBuilder, Optional.of(node), callback);
+ }
+
public void setTimer(final Time time, final long requestTimeoutMs) {
this.timer = time.timer(requestTimeoutMs);
}
@@ -243,6 +304,10 @@ public class NetworkClientDelegate implements AutoCloseable {
return requestBuilder;
}
+ Optional node() {
+ return node;
+ }
+
@Override
public String toString() {
return "UnsentRequest{" +
@@ -291,4 +356,32 @@ public class NetworkClientDelegate implements AutoCloseable {
return future;
}
}
+
+ /**
+ * Creates a {@link Supplier} for deferred creation during invocation by
+ * {@link ConsumerNetworkThread}.
+ */
+ public static Supplier supplier(final Time time,
+ final LogContext logContext,
+ final ConsumerMetadata metadata,
+ final ConsumerConfig config,
+ final ApiVersions apiVersions,
+ final Metrics metrics,
+ final FetchMetricsManager fetchMetricsManager) {
+ return new CachedSupplier() {
+ @Override
+ protected NetworkClientDelegate create() {
+ KafkaClient client = ClientUtils.createNetworkClient(config,
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX,
+ logContext,
+ apiVersions,
+ time,
+ CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
+ metadata,
+ fetchMetricsManager.throttleTimeSensor());
+ return new NetworkClientDelegate(time, config, logContext, client);
+ }
+ };
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index b7fdefeb0d1..9239811f7d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -262,7 +263,7 @@ class OffsetFetcherUtils {
else if (strategy == OffsetResetStrategy.LATEST)
return ListOffsetsRequest.LATEST_TIMESTAMP;
else
- return null;
+ throw new NoOffsetForPartitionException(partition);
}
static Set topicsForPartitions(Collection partitions) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index a1cb8d54a97..700e2ab6e17 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.IsolationLevel;
@@ -83,6 +85,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
private final Time time;
private final ApiVersions apiVersions;
private final NetworkClientDelegate networkClientDelegate;
+ private final BackgroundEventHandler backgroundEventHandler;
@SuppressWarnings("this-escape")
public OffsetsRequestManager(final SubscriptionState subscriptionState,
@@ -93,6 +96,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
final long requestTimeoutMs,
final ApiVersions apiVersions,
final NetworkClientDelegate networkClientDelegate,
+ final BackgroundEventHandler backgroundEventHandler,
final LogContext logContext) {
requireNonNull(subscriptionState);
requireNonNull(metadata);
@@ -100,6 +104,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
requireNonNull(time);
requireNonNull(apiVersions);
requireNonNull(networkClientDelegate);
+ requireNonNull(backgroundEventHandler);
requireNonNull(logContext);
this.metadata = metadata;
@@ -112,11 +117,12 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
this.requestTimeoutMs = requestTimeoutMs;
this.apiVersions = apiVersions;
this.networkClientDelegate = networkClientDelegate;
+ this.backgroundEventHandler = backgroundEventHandler;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState,
time, retryBackoffMs, apiVersions);
// Register the cluster metadata update callback. Note this only relies on the
// requestsToRetry initialized above, and won't be invoked until all managers are
- // initialized and the background thread started.
+ // initialized and the network thread started.
this.metadata.addClusterUpdateListener(this);
}
@@ -127,10 +133,10 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- NetworkClientDelegate.PollResult pollResult =
- new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>(requestsToSend));
- this.requestsToSend.clear();
- return pollResult;
+ // Copy the outgoing request list and clear it.
+ List unsentRequests = new ArrayList<>(requestsToSend);
+ requestsToSend.clear();
+ return new NetworkClientDelegate.PollResult(unsentRequests);
}
/**
@@ -188,7 +194,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
*/
public void resetPositionsIfNeeded() {
- Map offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
+ Map offsetResetTimestamps;
+
+ try {
+ offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
+ } catch (Exception e) {
+ backgroundEventHandler.add(new ErrorBackgroundEvent(e));
+ return;
+ }
if (offsetResetTimestamps.isEmpty())
return;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 195bee9ddd4..949616daa85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -20,25 +20,33 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -48,11 +56,12 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -67,121 +76,246 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.common.utils.Utils.propsToMap;
/**
- * This prototype consumer uses the EventHandler to process application
- * events so that the network IO can be processed in a background thread. Visit
- * this document
+ * This prototype consumer uses an {@link ApplicationEventHandler event handler} to process
+ * {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated
+ * {@link ConsumerNetworkThread network thread}. Visit
+ * this document
* for detail implementation.
*/
public class PrototypeAsyncConsumer implements Consumer {
- static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
- private final LogContext logContext;
- private final EventHandler eventHandler;
+ private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final Optional groupId;
- private final Logger log;
+ private final KafkaConsumerMetrics kafkaConsumerMetrics;
+ private Logger log;
+ private final String clientId;
+ private final BackgroundEventProcessor backgroundEventProcessor;
private final Deserializers deserializers;
+
+ /**
+ * A thread-safe {@link FetchBuffer fetch buffer} for the results that are populated in the
+ * {@link ConsumerNetworkThread network thread} when the results are available. Because of the interaction
+ * of the fetch buffer in the application thread and the network I/O thread, this is shared between the
+ * two threads and is thus designed to be thread-safe.
+ */
+ private final FetchBuffer fetchBuffer;
+ private final FetchCollector fetchCollector;
+ private final ConsumerInterceptors interceptors;
+ private final IsolationLevel isolationLevel;
+
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
private final Metrics metrics;
+ private final long retryBackoffMs;
private final long defaultApiTimeoutMs;
+ private volatile boolean closed = false;
+ private final List assignors;
- private WakeupTrigger wakeupTrigger = new WakeupTrigger();
- public PrototypeAsyncConsumer(Properties properties,
- Deserializer keyDeserializer,
- Deserializer valueDeserializer) {
+ // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
+ private boolean cachedSubscriptionHasAllFetchPositions;
+ private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
+
+ public PrototypeAsyncConsumer(final Properties properties,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer) {
this(propsToMap(properties), keyDeserializer, valueDeserializer);
}
public PrototypeAsyncConsumer(final Map configs,
- final Deserializer keyDeser,
- final Deserializer valDeser) {
- this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser, valDeser);
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer) {
+ this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+ keyDeserializer,
+ valueDeserializer);
}
public PrototypeAsyncConsumer(final ConsumerConfig config,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer) {
- this.time = Time.SYSTEM;
- GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
- GroupRebalanceConfig.ProtocolType.CONSUMER);
- this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
- this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
- this.logContext = createLogContext(config, groupRebalanceConfig);
- this.log = logContext.logger(getClass());
- this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
- this.subscriptions = createSubscriptionState(config, logContext);
- this.metrics = createMetrics(config, time);
- List> interceptorList = configuredConsumerInterceptors(config);
- ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
- metrics.reporters(),
- interceptorList,
- Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
- this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
- final List addresses = ClientUtils.parseAndValidateAddresses(config);
- metadata.bootstrap(addresses);
- this.eventHandler = new DefaultEventHandler(
- config,
- groupRebalanceConfig,
- logContext,
- subscriptions,
- new ApiVersions(),
- this.metrics,
- clusterResourceListeners,
- null // this is coming from the fetcher, but we don't have one
- );
+ this(Time.SYSTEM, config, keyDeserializer, valueDeserializer);
}
- // Visible for testing
- PrototypeAsyncConsumer(Time time,
- LogContext logContext,
- ConsumerConfig config,
- SubscriptionState subscriptions,
- ConsumerMetadata metadata,
- EventHandler eventHandler,
- Metrics metrics,
- Optional groupId,
- int defaultApiTimeoutMs) {
- this.time = time;
- this.logContext = logContext;
+ public PrototypeAsyncConsumer(final Time time,
+ final ConsumerConfig config,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer) {
+ try {
+ GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
+ GroupRebalanceConfig.ProtocolType.CONSUMER);
+
+ this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
+ this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+ LogContext logContext = createLogContext(config, groupRebalanceConfig);
+ this.log = logContext.logger(getClass());
+ groupId.ifPresent(groupIdStr -> {
+ if (groupIdStr.isEmpty()) {
+ log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
+ }
+ });
+
+ log.debug("Initializing the Kafka consumer");
+ this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ this.time = time;
+ this.metrics = createMetrics(config, time);
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+
+ List> interceptorList = configuredConsumerInterceptors(config);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList);
+ this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+ this.subscriptions = createSubscriptionState(config, logContext);
+ ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
+ interceptorList,
+ Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
+ this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
+ final List addresses = ClientUtils.parseAndValidateAddresses(config);
+ metadata.bootstrap(addresses);
+
+ FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
+ FetchConfig fetchConfig = new FetchConfig(config);
+ this.isolationLevel = fetchConfig.isolationLevel;
+
+ ApiVersions apiVersions = new ApiVersions();
+ final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>();
+ final BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>();
+
+ // This FetchBuffer is shared between the application and network threads.
+ this.fetchBuffer = new FetchBuffer(logContext);
+ final Supplier networkClientDelegateSupplier = NetworkClientDelegate.supplier(time,
+ logContext,
+ metadata,
+ config,
+ apiVersions,
+ metrics,
+ fetchMetricsManager);
+ final Supplier requestManagersSupplier = RequestManagers.supplier(time,
+ logContext,
+ backgroundEventQueue,
+ metadata,
+ subscriptions,
+ fetchBuffer,
+ config,
+ groupRebalanceConfig,
+ apiVersions,
+ fetchMetricsManager,
+ networkClientDelegateSupplier);
+ final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
+ metadata,
+ applicationEventQueue,
+ requestManagersSupplier);
+ this.applicationEventHandler = new ApplicationEventHandler(logContext,
+ time,
+ applicationEventQueue,
+ applicationEventProcessorSupplier,
+ networkClientDelegateSupplier,
+ requestManagersSupplier);
+ this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
+ this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
+ config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+ config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
+ );
+
+ // no coordinator will be constructed for the default (null) group id
+ if (!groupId.isPresent()) {
+ config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+ //config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+ }
+
+ // The FetchCollector is only used on the application thread.
+ this.fetchCollector = new FetchCollector<>(logContext,
+ metadata,
+ subscriptions,
+ fetchConfig,
+ deserializers,
+ fetchMetricsManager,
+ time);
+
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+
+ config.logUnused();
+ AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
+ log.debug("Kafka consumer initialized");
+ } catch (Throwable t) {
+ // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
+ // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
+ if (this.log != null) {
+ close(Duration.ZERO, true);
+ }
+ // now propagate the exception
+ throw new KafkaException("Failed to construct kafka consumer", t);
+ }
+ }
+
+ public PrototypeAsyncConsumer(LogContext logContext,
+ String clientId,
+ Deserializers deserializers,
+ FetchBuffer fetchBuffer,
+ FetchCollector fetchCollector,
+ ConsumerInterceptors interceptors,
+ Time time,
+ ApplicationEventHandler applicationEventHandler,
+ BlockingQueue backgroundEventQueue,
+ Metrics metrics,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
+ long retryBackoffMs,
+ int defaultApiTimeoutMs,
+ List assignors,
+ String groupId) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
- this.metadata = metadata;
+ this.clientId = clientId;
+ this.fetchBuffer = fetchBuffer;
+ this.fetchCollector = fetchCollector;
+ this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
+ this.interceptors = Objects.requireNonNull(interceptors);
+ this.time = time;
+ this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
this.metrics = metrics;
- this.groupId = groupId;
+ this.groupId = Optional.ofNullable(groupId);
+ this.metadata = metadata;
+ this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
- this.deserializers = new Deserializers<>(config);
- this.eventHandler = eventHandler;
+ this.deserializers = deserializers;
+ this.applicationEventHandler = applicationEventHandler;
+ this.assignors = assignors;
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
}
/**
- * poll implementation using {@link EventHandler}.
+ * poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
* another type of event, process it.
* 2. Send fetches if needed.
@@ -193,76 +327,33 @@ public class PrototypeAsyncConsumer implements Consumer {
@Override
public ConsumerRecords poll(final Duration timeout) {
Timer timer = time.timer(timeout);
+
try {
+ kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
+
+ if (subscriptions.hasNoSubscriptionOrUserAssignment()) {
+ throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
+ }
+
do {
- if (!eventHandler.isEmpty()) {
- final Optional backgroundEvent = eventHandler.poll();
- // processEvent() may process 3 types of event:
- // 1. Errors
- // 2. Callback Invocation
- // 3. Fetch responses
- // Errors will be handled or rethrown.
- // Callback invocation will trigger callback function execution, which is blocking until completion.
- // Successful fetch responses will be added to the completedFetches in the fetcher, which will then
- // be processed in the collectFetches().
- backgroundEvent.ifPresent(event -> processEvent(event, timeout));
- }
+ updateAssignmentMetadataIfNeeded(timer);
+ final Fetch fetch = pollForFetches(timer);
- updateFetchPositionsIfNeeded(timer);
-
- // The idea here is to have the background thread sending fetches autonomously, and the fetcher
- // uses the poll loop to retrieve successful fetchResponse and process them on the polling thread.
- final Fetch fetch = collectFetches();
if (!fetch.isEmpty()) {
- return processFetchResults(fetch);
+ if (fetch.records().isEmpty()) {
+ log.trace("Returning empty records from `poll()` "
+ + "since the consumer's position has advanced for at least one topic partition");
+ }
+
+ return interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
}
// We will wait for retryBackoffMs
- } while (time.timer(timeout).notExpired());
- } catch (final Exception e) {
- throw new RuntimeException(e);
+ } while (timer.notExpired());
+
+ return ConsumerRecords.empty();
+ } finally {
+ kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
- // TODO: Once we implement poll(), clear wakeupTrigger in a finally block: wakeupTrigger.clearActiveTask();
-
- return ConsumerRecords.empty();
- }
-
- /**
- * Set the fetch position to the committed position (if there is one) or reset it using the
- * offset reset policy the user has configured (if partitions require reset)
- *
- * @return true if the operation completed without timing out
- * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
- * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
- * defined
- */
- private boolean updateFetchPositionsIfNeeded(final Timer timer) {
- // Validate positions using the partition leader end offsets, to detect if any partition
- // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
- // request, retrieve the partition end offsets, and validate the current position against it.
- ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent();
- eventHandler.add(validatePositionsEvent);
-
- // Reset positions using committed offsets retrieved from the group coordinator, for any
- // partitions which do not have a valid position and are not awaiting reset. This will
- // trigger an OffsetFetch request and update positions with the offsets retrieved. This
- // will only do a coordinator lookup if there are partitions which have missing
- // positions, so a consumer with manually assigned partitions can avoid a coordinator
- // dependence by always ensuring that assigned partitions have an initial position.
- if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer))
- return false;
-
- // If there are partitions still needing a position and a reset policy is defined,
- // request reset using the default policy. If no reset strategy is defined and there
- // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception.
- subscriptions.resetInitializingPositions();
-
- // Reset positions using partition offsets retrieved from the leader, for any partitions
- // which are awaiting reset. This will trigger a ListOffset request, retrieve the
- // partition offsets according to the strategy (ex. earliest, latest), and update the
- // positions.
- ResetPositionsApplicationEvent resetPositionsEvent = new ResetPositionsApplicationEvent();
- eventHandler.add(resetPositionsEvent);
- return true;
}
/**
@@ -274,20 +365,6 @@ public class PrototypeAsyncConsumer implements Consumer {
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}
- private void processEvent(final BackgroundEvent backgroundEvent, final Duration timeout) {
- // stubbed class
- }
-
- private ConsumerRecords processFetchResults(final Fetch fetch) {
- // stubbed class
- return ConsumerRecords.empty();
- }
-
- private Fetch collectFetches() {
- // stubbed class
- return Fetch.empty();
- }
-
/**
* This method sends a commit event to the EventHandler and return.
*/
@@ -324,50 +401,96 @@ public class PrototypeAsyncConsumer implements Consumer {
// the task can only be woken up if the top level API call is commitSync
wakeupTrigger.setActiveTask(commitEvent.future());
}
- eventHandler.add(commitEvent);
+ applicationEventHandler.add(commitEvent);
return commitEvent.future();
}
@Override
public void seek(TopicPartition partition, long offset) {
- throw new KafkaException("method not implemented");
+ if (offset < 0)
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+
+ log.info("Seeking to offset {} for partition {}", offset, partition);
+ SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
+ offset,
+ Optional.empty(), // This will ensure we skip validation
+ metadata.currentLeader(partition));
+ subscriptions.seekUnvalidated(partition, newPosition);
}
@Override
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
- throw new KafkaException("method not implemented");
+ long offset = offsetAndMetadata.offset();
+ if (offset < 0) {
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+ }
+
+ if (offsetAndMetadata.leaderEpoch().isPresent()) {
+ log.info("Seeking to offset {} for partition {} with epoch {}",
+ offset, partition, offsetAndMetadata.leaderEpoch().get());
+ } else {
+ log.info("Seeking to offset {} for partition {}", offset, partition);
+ }
+ Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(partition);
+ SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
+ offsetAndMetadata.offset(),
+ offsetAndMetadata.leaderEpoch(),
+ currentLeaderAndEpoch);
+ updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
+ subscriptions.seekUnvalidated(partition, newPosition);
}
@Override
public void seekToBeginning(Collection partitions) {
- throw new KafkaException("method not implemented");
+ if (partitions == null)
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+
+ Collection parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
+ subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
}
@Override
public void seekToEnd(Collection partitions) {
- throw new KafkaException("method not implemented");
+ if (partitions == null)
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+
+ Collection parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
+ subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
}
@Override
public long position(TopicPartition partition) {
- throw new KafkaException("method not implemented");
+ return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public long position(TopicPartition partition, Duration timeout) {
- throw new KafkaException("method not implemented");
+ if (!subscriptions.isAssigned(partition))
+ throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
+
+ Timer timer = time.timer(timeout);
+ do {
+ SubscriptionState.FetchPosition position = subscriptions.validPosition(partition);
+ if (position != null)
+ return position.offset;
+
+ updateFetchPositions(timer);
+ } while (timer.notExpired());
+
+ throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
+ "for partition " + partition + " could be determined");
}
@Override
@Deprecated
public OffsetAndMetadata committed(TopicPartition partition) {
- throw new KafkaException("method not implemented");
+ return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
@Deprecated
public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
- throw new KafkaException("method not implemented");
+ return committed(Collections.singleton(partition), timeout).get(partition);
}
@Override
@@ -386,7 +509,7 @@ public class PrototypeAsyncConsumer implements Consumer {
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
wakeupTrigger.setActiveTask(event.future());
try {
- return eventHandler.addAndGet(event, time.timer(timeout));
+ return applicationEventHandler.addAndGet(event, time.timer(timeout));
} finally {
wakeupTrigger.clearActiveTask();
}
@@ -401,12 +524,12 @@ public class PrototypeAsyncConsumer implements Consumer {
@Override
public Map metrics() {
- throw new KafkaException("method not implemented");
+ return Collections.unmodifiableMap(metrics.metrics());
}
@Override
public List partitionsFor(String topic) {
- throw new KafkaException("method not implemented");
+ return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
@@ -416,7 +539,7 @@ public class PrototypeAsyncConsumer implements Consumer {
@Override
public Map> listTopics() {
- throw new KafkaException("method not implemented");
+ return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
@@ -426,17 +549,23 @@ public class PrototypeAsyncConsumer implements Consumer {
@Override
public Set paused() {
- throw new KafkaException("method not implemented");
+ return Collections.unmodifiableSet(subscriptions.pausedPartitions());
}
@Override
public void pause(Collection partitions) {
- throw new KafkaException("method not implemented");
+ log.debug("Pausing partitions {}", partitions);
+ for (TopicPartition partition: partitions) {
+ subscriptions.pause(partition);
+ }
}
@Override
public void resume(Collection partitions) {
- throw new KafkaException("method not implemented");
+ log.debug("Resuming partitions {}", partitions);
+ for (TopicPartition partition: partitions) {
+ subscriptions.resume(partition);
+ }
}
@Override
@@ -469,7 +598,7 @@ public class PrototypeAsyncConsumer implements Consumer {
if (timeout.toMillis() == 0L)
return listOffsetsEvent.emptyResult();
- return eventHandler.addAndGet(listOffsetsEvent, time.timer(timeout));
+ return applicationEventHandler.addAndGet(listOffsetsEvent, time.timer(timeout));
}
@Override
@@ -502,20 +631,42 @@ public class PrototypeAsyncConsumer