mirror of https://github.com/apache/kafka.git
KAFKA-15865: Remove autocommit completion event (#14831)
There is no callback associated with autocommit, so I do not think we need this event. This closes KAFKA-15865. Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
d71d0639d9
commit
c0ec8131d8
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
|
|||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AutoCommitCompletionBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
|
@ -234,8 +233,6 @@ public class CommitRequestManager implements RequestManager {
|
|||
return (response, throwable) -> {
|
||||
autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false));
|
||||
if (throwable == null) {
|
||||
// We need to notify the application thread to execute OffsetCommitCallback
|
||||
backgroundEventHandler.add(new AutoCommitCompletionBackgroundEvent());
|
||||
log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets);
|
||||
} else if (throwable instanceof RetriableCommitFailedException) {
|
||||
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}",
|
||||
|
|
|
|||
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
public class AutoCommitCompletionBackgroundEvent extends BackgroundEvent {
|
||||
public AutoCommitCompletionBackgroundEvent() {
|
||||
super(Type.AUTO_COMMIT_COMPLETION);
|
||||
}
|
||||
}
|
||||
|
|
@ -26,7 +26,7 @@ import java.util.Objects;
|
|||
public abstract class BackgroundEvent {
|
||||
|
||||
public enum Type {
|
||||
ERROR, AUTO_COMMIT_COMPLETION
|
||||
ERROR
|
||||
}
|
||||
|
||||
protected final Type type;
|
||||
|
|
|
|||
|
|
@ -62,9 +62,6 @@ public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> {
|
|||
case ERROR:
|
||||
process((ErrorBackgroundEvent) event);
|
||||
break;
|
||||
case AUTO_COMMIT_COMPLETION:
|
||||
process((AutoCommitCompletionBackgroundEvent) event);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Background event type " + event.type() + " was not expected");
|
||||
|
||||
|
|
@ -80,7 +77,4 @@ public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> {
|
|||
throw event.error();
|
||||
}
|
||||
|
||||
private void process(final AutoCommitCompletionBackgroundEvent event) {
|
||||
// TODO: invoke OffsetCommitCallback
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import org.apache.kafka.clients.ClientResponse;
|
|||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AutoCommitCompletionBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
|
@ -145,7 +144,6 @@ public class CommitRequestManagerTest {
|
|||
1,
|
||||
(short) 1,
|
||||
Errors.NONE)));
|
||||
verify(backgroundEventHandler).add(any(AutoCommitCompletionBackgroundEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -228,9 +226,6 @@ public class CommitRequestManagerTest {
|
|||
1,
|
||||
(short) 1,
|
||||
Errors.NONE));
|
||||
|
||||
// The result was completed successfully, expecting an event sent to the application thread.
|
||||
verify(backgroundEventHandler).add(any(AutoCommitCompletionBackgroundEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in New Issue