KAFKA-13113; Support unregistering Raft listeners (#11109)

This patch adds support for unregistering listeners to `RaftClient`. 

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
José Armando García Sancio 2021-07-23 21:54:44 -07:00 committed by GitHub
parent d4877982bc
commit 55d9acad65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 188 additions and 53 deletions

View File

@ -71,7 +71,7 @@
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/>

View File

@ -43,12 +43,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
@ -418,7 +419,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
/**
* The listener objects attached to this local log manager.
*/
private final List<MetaLogListenerData> listeners = new ArrayList<>();
private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData> listeners = new IdentityHashMap<>();
/**
* The current leader, as seen by this log manager.
@ -441,7 +442,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
try {
log.debug("Node {}: running log check.", nodeId);
int numEntriesFound = 0;
for (MetaLogListenerData listenerData : listeners) {
for (MetaLogListenerData listenerData : listeners.values()) {
while (true) {
// Load the snapshot if needed and we are not the leader
LeaderAndEpoch notifiedLeader = listenerData.notifiedLeader();
@ -526,7 +527,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
resign(leader.epoch());
for (MetaLogListenerData listenerData : listeners) {
for (MetaLogListenerData listenerData : listeners.values()) {
listenerData.beginShutdown();
}
shared.unregisterLogManager(this);
@ -586,8 +587,12 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
"already been shut down.", nodeId);
future.complete(null);
} else if (initialized) {
log.info("Node {}: registered MetaLogListener.", nodeId);
listeners.add(new MetaLogListenerData(listener));
int id = System.identityHashCode(listener);
if (listeners.putIfAbsent(listener, new MetaLogListenerData(listener)) != null) {
log.error("Node {}: can't register because listener {} already exists", nodeId, id);
} else {
log.info("Node {}: registered MetaLogListener {}", nodeId, id);
}
shared.electLeaderIfNeeded();
scheduleLogCheck();
future.complete(null);
@ -608,6 +613,22 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
}
}
@Override
public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
eventQueue.append(() -> {
if (shutdown) {
log.info("Node {}: can't unregister because local log manager is shutdown", nodeId);
} else {
int id = System.identityHashCode(listener);
if (listeners.remove(listener) == null) {
log.error("Node {}: can't unregister because the listener {} doesn't exists", nodeId, id);
} else {
log.info("Node {}: unregistered MetaLogListener {}", nodeId, id);
}
}
});
}
@Override
public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
return scheduleAtomicAppend(epoch, batch);
@ -664,7 +685,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
final CompletableFuture<List<RaftClient.Listener<ApiMessageAndVersion>>> future = new CompletableFuture<>();
eventQueue.append(() -> {
future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
future.complete(listeners.values().stream().map(l -> l.listener).collect(Collectors.toList()));
});
try {
return future.get();

View File

@ -75,8 +75,8 @@ import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -162,8 +162,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final RequestManager requestManager;
private final RaftMetadataLogCleanerManager snapshotCleaner;
private final List<ListenerContext> listenerContexts = new ArrayList<>();
private final ConcurrentLinkedQueue<Listener<T>> pendingListeners = new ConcurrentLinkedQueue<>();
private final Map<Listener<T>, ListenerContext> listenerContexts = new IdentityHashMap<>();
private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations = new ConcurrentLinkedQueue<>();
/**
* Create a new instance.
@ -302,7 +302,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void updateListenersProgress(long highWatermark) {
for (ListenerContext listenerContext : listenerContexts) {
for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
@ -335,7 +335,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void maybeFireHandleCommit(long baseOffset, int epoch, long appendTimestamp, int sizeInBytes, List<T> records) {
for (ListenerContext listenerContext : listenerContexts) {
for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.nextExpectedOffset().ifPresent(nextOffset -> {
if (nextOffset == baseOffset) {
listenerContext.fireHandleCommit(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
@ -345,13 +345,13 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void maybeFireLeaderChange(LeaderState<T> state) {
for (ListenerContext listenerContext : listenerContexts) {
for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), state.epochStartOffset());
}
}
private void maybeFireLeaderChange() {
for (ListenerContext listenerContext : listenerContexts) {
for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
}
}
@ -380,10 +380,17 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
@Override
public void register(Listener<T> listener) {
pendingListeners.add(listener);
pendingRegistrations.add(Registration.register(listener));
wakeup();
}
@Override
public void unregister(Listener<T> listener) {
pendingRegistrations.add(Registration.unregister(listener));
// No need to wakeup the polling thread. It is a removal so the updates can be
// delayed until the polling thread wakes up for other reasons.
}
@Override
public LeaderAndEpoch leaderAndEpoch() {
return quorum.leaderAndEpoch();
@ -1023,6 +1030,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return OptionalInt.of(leaderIdOrNil);
}
private static String listenerName(Listener<?> listener) {
return String.format("%s@%s", listener.getClass().getTypeName(), System.identityHashCode(listener));
}
private boolean handleFetchResponse(
RaftResponse.Inbound responseMetadata,
long currentTimeMs
@ -2106,10 +2117,14 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void pollListeners() {
// Register any listeners added since the last poll
while (!pendingListeners.isEmpty()) {
Listener<T> listener = pendingListeners.poll();
listenerContexts.add(new ListenerContext(listener));
// Apply all of the pending registration
while (true) {
Registration<T> registration = pendingRegistrations.poll();
if (registration == null) {
break;
}
processRegistration(registration);
}
// Check listener progress to see if reads are expected
@ -2118,6 +2133,25 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
});
}
private void processRegistration(Registration<T> registration) {
Listener<T> listener = registration.listener();
Registration.Ops ops = registration.ops();
if (ops == Registration.Ops.REGISTER) {
if (listenerContexts.putIfAbsent(listener, new ListenerContext(listener)) != null) {
logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
} else {
logger.info("Registered the listener {}", listenerName(listener));
}
} else {
if (listenerContexts.remove(listener) == null) {
logger.error("Attempting to remove a listener that doesn't exists: {}", listenerName(listener));
} else {
logger.info("Unregistered the listener {}", listenerName(listener));
}
}
}
private boolean maybeCompleteShutdown(long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown == null) {
@ -2381,13 +2415,43 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private static final class Registration<T> {
private final Ops ops;
private final Listener<T> listener;
private Registration(Ops ops, Listener<T> listener) {
this.ops = ops;
this.listener = listener;
}
private Ops ops() {
return ops;
}
private Listener<T> listener() {
return listener;
}
private static enum Ops {
REGISTER, UNREGISTER
}
private static <T> Registration<T> register(Listener<T> listener) {
return new Registration<>(Ops.REGISTER, listener);
}
private static <T> Registration<T> unregister(Listener<T> listener) {
return new Registration<>(Ops.UNREGISTER, listener);
}
}
private final class ListenerContext implements CloseListener<BatchReader<T>> {
private final RaftClient.Listener<T> listener;
// This field is used only by the Raft IO thread
private LeaderAndEpoch lastFiredLeaderChange = new LeaderAndEpoch(OptionalInt.empty(), 0);
// These fields are visible to both the Raft IO thread and the listener
// and are protected through synchronization on this `ListenerContext` instance
// and are protected through synchronization on this ListenerContext instance
private BatchReader<T> lastSent = null;
private long nextOffset = 0;
@ -2399,7 +2463,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* Get the last acked offset, which is one greater than the offset of the
* last record which was acked by the state machine.
*/
public synchronized long nextOffset() {
private synchronized long nextOffset() {
return nextOffset;
}
@ -2411,7 +2475,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* we delay sending additional data until the state machine has read to the
* end and the last offset is determined.
*/
public synchronized OptionalLong nextExpectedOffset() {
private synchronized OptionalLong nextExpectedOffset() {
if (lastSent != null) {
OptionalLong lastSentOffset = lastSent.lastOffset();
if (lastSentOffset.isPresent()) {
@ -2428,7 +2492,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* This API is used when the Listener needs to be notified of a new snapshot. This happens
* when the context's next offset is less than the log start offset.
*/
public void fireHandleSnapshot(SnapshotReader<T> reader) {
private void fireHandleSnapshot(SnapshotReader<T> reader) {
synchronized (this) {
nextOffset = reader.snapshotId().offset;
lastSent = null;
@ -2444,7 +2508,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* know whether it has been committed. Rather than retaining the uncommitted
* data in memory, we let the state machine read the records from disk.
*/
public void fireHandleCommit(long baseOffset, Records records) {
private void fireHandleCommit(long baseOffset, Records records) {
fireHandleCommit(
RecordsBatchReader.of(
baseOffset,
@ -2464,7 +2528,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* a nice optimization for the leader which is typically doing more work than all of the
* followers.
*/
public void fireHandleCommit(
private void fireHandleCommit(
long baseOffset,
int epoch,
long appendTimestamp,
@ -2476,8 +2540,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
fireHandleCommit(reader);
}
public String listenerName() {
return listener.getClass().getTypeName();
private String listenerName() {
return KafkaRaftClient.listenerName(listener);
}
private void fireHandleCommit(BatchReader<T> reader) {
@ -2493,7 +2557,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
listener.handleCommit(reader);
}
void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
if (shouldFireLeaderChange(leaderAndEpoch)) {
lastFiredLeaderChange = leaderAndEpoch;
logger.debug("Notifying listener {} of leader change {}", listenerName(), leaderAndEpoch);
@ -2512,7 +2576,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
// If this node is becoming the leader, then we can fire `handleClaim` as soon
// as the listener has caught up to the start of the leader epoch. This guarantees
// that the state machine has seen the full committed state before it becomes

View File

@ -89,15 +89,32 @@ public interface RaftClient<T> extends AutoCloseable {
void initialize();
/**
* Register a listener to get commit/leader notifications.
* Register a listener to get commit, snapshot and leader notifications.
*
* @param listener the listener
* The implementation of this interface assumes that each call to {@code register} uses
* a different {@code Listener} instance. If the same instance is used for multiple calls
* to this method, then only one {@code Listener} will be registered.
*
* @param listener the listener to register
*/
void register(Listener<T> listener);
/**
* Unregisters a listener.
*
* To distinguish from events that happend before the call to {@code unregister} and a future
* call to {@code register}, different {@code Listener} instances must be used.
*
* If the {@code Listener} provided was never registered then the unregistration is ignored.
*
* @param listener the listener to unregister
*/
void unregister(Listener<T> listener);
/**
* Return the current {@link LeaderAndEpoch}.
* @return the current {@link LeaderAndEpoch}
*
* @return the current leader and epoch
*/
LeaderAndEpoch leaderAndEpoch();

View File

@ -22,7 +22,7 @@ public final class ValidOffsetAndEpoch {
final private Kind kind;
final private OffsetAndEpoch offsetAndEpoch;
ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
private ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
this.kind = kind;
this.offsetAndEpoch = offsetAndEpoch;
}

View File

@ -2491,6 +2491,47 @@ public class KafkaRaftClientTest {
assertEquals(9L, secondListener.claimedEpochStartOffset(epoch));
}
@Test
public void testReregistrationChangesListenerContext() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
List<String> batch1 = Arrays.asList("1", "2", "3");
List<String> batch2 = Arrays.asList("4", "5", "6");
List<String> batch3 = Arrays.asList("7", "8", "9");
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(1, batch1)
.appendToLog(1, batch2)
.appendToLog(2, batch3)
.withUnknownLeader(epoch - 1)
.build();
context.becomeLeader();
context.client.poll();
assertEquals(10L, context.log.endOffset().offset);
// Let the initial listener catch up
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
context.pollUntil(() -> OptionalLong.of(8).equals(context.listener.lastCommitOffset()));
// Register a second listener
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.pollUntil(() -> OptionalLong.of(8).equals(secondListener.lastCommitOffset()));
context.client.unregister(secondListener);
// Write to the log and show that the default listener gets updated...
assertEquals(10L, context.client.scheduleAppend(epoch, singletonList("a")));
context.client.poll();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
context.pollUntil(() -> OptionalLong.of(10).equals(context.listener.lastCommitOffset()));
// ... but unregister listener doesn't
assertEquals(OptionalLong.of(8), secondListener.lastCommitOffset());
}
@Test
public void testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances() throws Exception {
int localId = 0;
@ -2677,5 +2718,4 @@ public class KafkaRaftClientTest {
private static KafkaMetric getMetric(final Metrics metrics, final String name) {
return metrics.metrics().get(metrics.metricName(name, "raft-metrics"));
}
}

View File

@ -766,7 +766,7 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch);
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(log.endOffset().offset, epoch)),
assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(log.endOffset().offset, epoch)),
resultOffsetAndEpoch);
}
@ -782,8 +782,7 @@ public class MockLogTest {
log.truncateToLatestSnapshot();
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch - 1);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.snapshot(olderEpochSnapshotId), resultOffsetAndEpoch);
}
@Test
@ -798,8 +797,7 @@ public class MockLogTest {
log.truncateToLatestSnapshot();
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.snapshot(olderEpochSnapshotId), resultOffsetAndEpoch);
}
@Test
@ -814,8 +812,7 @@ public class MockLogTest {
log.truncateToLatestSnapshot();
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, olderEpochSnapshotId),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind());
}
@Test
@ -835,8 +832,7 @@ public class MockLogTest {
// offset is not equal to oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(20, 2)),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(20, 2)), resultOffsetAndEpoch);
}
@Test
@ -854,8 +850,7 @@ public class MockLogTest {
// offset is not equal to oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, olderEpochSnapshotId),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.diverging(olderEpochSnapshotId), resultOffsetAndEpoch);
}
@Test
@ -866,7 +861,7 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch);
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(log.endOffset().offset, epoch)),
assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(log.endOffset().offset, epoch)),
resultOffsetAndEpoch);
}
@ -879,8 +874,7 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch + 1);
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, epoch);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(10, epoch)),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(10, epoch)), resultOffsetAndEpoch);
}
@Test
@ -891,8 +885,7 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch);
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords - 1, epoch);
assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, new OffsetAndEpoch(numberOfRecords - 1, epoch)),
resultOffsetAndEpoch);
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind());
}
private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {