From bf5e1f1cc0d7f7b1f54c879fbb5f415d30fa9c06 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 11 Feb 2021 08:42:59 -0800 Subject: [PATCH] MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106) Add MetaLogListener, LocalLogManager, and related classes. These classes are used by the KIP-500 controller and broker to interface with the Raft log. Also add the Controller interface. The implementation will be added in a separate PR. Reviewers: Ron Dagostino , David Arthur --- checkstyle/import-control.xml | 30 ++ .../apache/kafka/controller/Controller.java | 180 +++++++++ .../kafka/controller/ResultOrError.java | 84 ++++ .../kafka/metadata/BrokerHeartbeatReply.java | 80 ++++ .../kafka/metadata/BrokerRegistration.java | 153 +++++++ .../metadata/BrokerRegistrationReply.java | 50 +++ .../org/apache/kafka/metadata/FeatureMap.java | 67 ++++ .../kafka/metadata/FeatureMapAndEpoch.java | 64 +++ .../kafka/metalog/metalog/MetaLogLeader.java | 58 +++ .../metalog/metalog/MetaLogListener.java | 55 +++ .../kafka/metalog/metalog/MetaLogManager.java | 79 ++++ .../metadata/BrokerRegistrationTest.java | 78 ++++ .../apache/kafka/metalog/LocalLogManager.java | 378 ++++++++++++++++++ .../metalog/metalog/LocalLogManagerTest.java | 153 +++++++ .../metalog/LocalLogManagerTestEnv.java | 143 +++++++ .../metalog/MockMetaLogManagerListener.java | 77 ++++ 16 files changed, 1729 insertions(+) create mode 100644 metadata/src/main/java/org/apache/kafka/controller/Controller.java create mode 100644 metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java create mode 100644 metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java create mode 100644 metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java create mode 100644 metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java create mode 100644 metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java create mode 100644 metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java create mode 100644 metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java create mode 100644 metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9cc432efd18..b6583701a57 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -192,6 +192,27 @@ + + + + + + + + + + + + + + + + + + + + + @@ -201,6 +222,15 @@ + + + + + + + + + diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java new file mode 100644 index 00000000000..0f6a54b1ad3 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureMapAndEpoch; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + + +public interface Controller extends AutoCloseable { + /** + * Change the in-sync replica sets for some partitions. + * + * @param request The AlterIsrRequest data. + * + * @return A future yielding the response. + */ + CompletableFuture alterIsr(AlterIsrRequestData request); + + /** + * Create a batch of topics. + * + * @param request The CreateTopicsRequest data. + * + * @return A future yielding the response. + */ + CompletableFuture + createTopics(CreateTopicsRequestData request); + + /** + * Unregister a broker. + * + * @param brokerId The broker id to unregister. + * + * @return A future that is completed successfully when the broker is + * unregistered. + */ + CompletableFuture unregisterBroker(int brokerId); + + /** + * Describe the current configuration of various resources. + * + * @param resources A map from resources to the collection of config keys that we + * want to describe for each. If the collection is empty, then + * all configuration keys will be described. + * + * @return + */ + CompletableFuture>>> + describeConfigs(Map> resources); + + /** + * Elect new partition leaders. + * + * @param request The request. + * + * @return A future yielding the elect leaders response. + */ + CompletableFuture electLeaders(ElectLeadersRequestData request); + + /** + * Get the current finalized feature ranges for each feature. + * + * @return A future yielding the feature ranges. + */ + CompletableFuture finalizedFeatures(); + + /** + * Perform some incremental configuration changes. + * + * @param configChanges The changes. + * @param validateOnly True if we should validate the changes but not apply them. + * + * @return A future yielding a map from partitions to error results. + */ + CompletableFuture> incrementalAlterConfigs( + Map>> configChanges, + boolean validateOnly); + + /** + * Perform some configuration changes using the legacy API. + * + * @param newConfigs The new configuration maps to apply. + * @param validateOnly True if we should validate the changes but not apply them. + * + * @return A future yielding a map from partitions to error results. + */ + CompletableFuture> legacyAlterConfigs( + Map> newConfigs, boolean validateOnly); + + /** + * Process a heartbeat from a broker. + * + * @param request The broker heartbeat request. + * + * @return A future yielding a heartbeat reply. + */ + CompletableFuture processBrokerHeartbeat( + BrokerHeartbeatRequestData request); + + /** + * Attempt to register the given broker. + * + * @param request The registration request. + * + * @return A future yielding a registration reply. + */ + CompletableFuture registerBroker( + BrokerRegistrationRequestData request); + + /** + * Wait for the given number of brokers to be registered and unfenced. + * This is for testing. + * + * @param minBrokers The minimum number of brokers to wait for. + * @return A future which is completed when the given number of brokers + * is reached. + */ + CompletableFuture waitForReadyBrokers(int minBrokers); + + /** + * Perform some client quota changes + * + * @param quotaAlterations The list of quotas to alter + * @param validateOnly True if we should validate the changes but not apply them. + * @return A future yielding a map of quota entities to error results. + */ + CompletableFuture> alterClientQuotas( + Collection quotaAlterations, boolean validateOnly + ); + + /** + * Begin shutting down, but don't block. You must still call close to clean up all + * resources. + */ + void beginShutdown(); + + /** + * If this controller is active, this is the non-negative controller epoch. + * Otherwise, this is -1. + */ + long curClaimEpoch(); + + /** + * Blocks until we have shut down and freed all resources. + */ + void close() throws InterruptedException; +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java new file mode 100644 index 00000000000..82e2b49123f --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; + +import java.util.Objects; + + +class ResultOrError { + private final ApiError error; + private final T result; + + public ResultOrError(Errors error, String message) { + this(new ApiError(error, message)); + } + + public ResultOrError(ApiError error) { + Objects.requireNonNull(error); + this.error = error; + this.result = null; + } + + public ResultOrError(T result) { + this.error = null; + this.result = result; + } + + public boolean isError() { + return error != null; + } + + public boolean isResult() { + return error == null; + } + + public ApiError error() { + return error; + } + + public T result() { + return result; + } + + @Override + public boolean equals(Object o) { + if (o == null || (!o.getClass().equals(getClass()))) { + return false; + } + ResultOrError other = (ResultOrError) o; + return error.equals(other.error) && + Objects.equals(result, other.result); + } + + @Override + public int hashCode() { + return Objects.hash(error, result); + } + + @Override + public String toString() { + if (error.isSuccess()) { + return "ResultOrError(" + result + ")"; + } else { + return "ResultOrError(" + error + ")"; + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java new file mode 100644 index 00000000000..5ab2a52dc0e --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import java.util.Objects; + + +public class BrokerHeartbeatReply { + /** + * True if the heartbeat reply should tell the broker that it has caught up. + */ + private final boolean isCaughtUp; + + /** + * True if the heartbeat reply should tell the broker that it is fenced. + */ + private final boolean isFenced; + + /** + * True if the heartbeat reply should tell the broker that it should shut down. + */ + private final boolean shouldShutDown; + + public BrokerHeartbeatReply(boolean isCaughtUp, + boolean isFenced, + boolean shouldShutDown) { + this.isCaughtUp = isCaughtUp; + this.isFenced = isFenced; + this.shouldShutDown = shouldShutDown; + } + + public boolean isCaughtUp() { + return isCaughtUp; + } + + public boolean isFenced() { + return isFenced; + } + + public boolean shouldShutDown() { + return shouldShutDown; + } + + @Override + public int hashCode() { + return Objects.hash(isCaughtUp, isFenced, shouldShutDown); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof BrokerHeartbeatReply)) return false; + BrokerHeartbeatReply other = (BrokerHeartbeatReply) o; + return other.isCaughtUp == isCaughtUp && + other.isFenced == isFenced && + other.shouldShutDown == shouldShutDown; + } + + @Override + public String toString() { + return "BrokerHeartbeatReply(isCaughtUp=" + isCaughtUp + + ", isFenced=" + isFenced + + ", shouldShutDown = " + shouldShutDown + + ")"; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java new file mode 100644 index 00000000000..c2be061a10c --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * An immutable class which represents broker registrations. + */ +public class BrokerRegistration { + private final int id; + private final long epoch; + private final Uuid incarnationId; + private final Map listeners; + private final Map supportedFeatures; + private final Optional rack; + private final boolean fenced; + + public BrokerRegistration(int id, + long epoch, + Uuid incarnationId, + List listeners, + Map supportedFeatures, + Optional rack, + boolean fenced) { + this.id = id; + this.epoch = epoch; + this.incarnationId = incarnationId; + Map listenersMap = new HashMap<>(); + for (Endpoint endpoint : listeners) { + listenersMap.put(endpoint.listenerName().get(), endpoint); + } + this.listeners = Collections.unmodifiableMap(listenersMap); + Objects.requireNonNull(supportedFeatures); + this.supportedFeatures = new HashMap<>(supportedFeatures); + Objects.requireNonNull(rack); + this.rack = rack; + this.fenced = fenced; + } + + public BrokerRegistration(int id, + long epoch, + Uuid incarnationId, + Map listeners, + Map supportedFeatures, + Optional rack, + boolean fenced) { + this.id = id; + this.epoch = epoch; + this.incarnationId = incarnationId; + this.listeners = new HashMap<>(listeners); + this.supportedFeatures = new HashMap<>(supportedFeatures); + this.rack = rack; + this.fenced = fenced; + } + + public int id() { + return id; + } + + public long epoch() { + return epoch; + } + + public Uuid incarnationId() { + return incarnationId; + } + + public Map listeners() { + return listeners; + } + + public Map supportedFeatures() { + return supportedFeatures; + } + + public Optional rack() { + return rack; + } + + public boolean fenced() { + return fenced; + } + + @Override + public int hashCode() { + return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures, + rack, fenced); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof BrokerRegistration)) return false; + BrokerRegistration other = (BrokerRegistration) o; + return other.id == id && + other.epoch == epoch && + other.incarnationId.equals(incarnationId) && + other.listeners.equals(listeners) && + other.supportedFeatures.equals(supportedFeatures) && + other.rack.equals(rack) && + other.fenced == fenced; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("BrokerRegistration(id=").append(id); + bld.append(", epoch=").append(epoch); + bld.append(", incarnationId=").append(incarnationId); + bld.append(", listeners=[").append( + listeners.keySet().stream().sorted(). + map(n -> listeners.get(n).toString()). + collect(Collectors.joining(", "))); + bld.append("], supportedFeatures={").append( + supportedFeatures.entrySet().stream().sorted(). + map(e -> e.getKey() + ": " + e.getValue()). + collect(Collectors.joining(", "))); + bld.append("}"); + bld.append(", rack=").append(rack); + bld.append(", fenced=").append(fenced); + bld.append(")"); + return bld.toString(); + } + + public BrokerRegistration cloneWithFencing(boolean fencing) { + return new BrokerRegistration(id, epoch, incarnationId, listeners, + supportedFeatures, rack, fencing); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java new file mode 100644 index 00000000000..40678edf644 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import java.util.Objects; + + +public class BrokerRegistrationReply { + private final long epoch; + + public BrokerRegistrationReply(long epoch) { + this.epoch = epoch; + } + + public long epoch() { + return epoch; + } + + @Override + public int hashCode() { + return Objects.hash(epoch); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof BrokerRegistrationReply)) return false; + BrokerRegistrationReply other = (BrokerRegistrationReply) o; + return other.epoch == epoch; + } + + @Override + public String toString() { + return "BrokerRegistrationReply(epoch=" + epoch + ")"; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java new file mode 100644 index 00000000000..272c87d2138 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + + +/** + * A map of feature names to their supported versions. + */ +public class FeatureMap { + private final Map features; + + public FeatureMap(Map features) { + this.features = Collections.unmodifiableMap(new HashMap<>(features)); + } + + public Optional get(String name) { + return Optional.ofNullable(features.get(name)); + } + + public Map features() { + return features; + } + + @Override + public int hashCode() { + return features.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof FeatureMap)) return false; + FeatureMap other = (FeatureMap) o; + return features.equals(other.features); + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("{"); + bld.append(features.keySet().stream().sorted(). + map(k -> k + ": " + features.get(k)). + collect(Collectors.joining(", "))); + bld.append("}"); + return bld.toString(); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java new file mode 100644 index 00000000000..26096ea7a33 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import java.util.Objects; + + +/** + * A map of feature names to their supported versions. + */ +public class FeatureMapAndEpoch { + private final FeatureMap map; + private final long epoch; + + public FeatureMapAndEpoch(FeatureMap map, long epoch) { + this.map = map; + this.epoch = epoch; + } + + public FeatureMap map() { + return map; + } + + public long epoch() { + return epoch; + } + + @Override + public int hashCode() { + return Objects.hash(map, epoch); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof FeatureMapAndEpoch)) return false; + FeatureMapAndEpoch other = (FeatureMapAndEpoch) o; + return map.equals(other.map) && epoch == other.epoch; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("{"); + bld.append("map=").append(map.toString()); + bld.append(", epoch=").append(epoch); + bld.append("}"); + return bld.toString(); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java new file mode 100644 index 00000000000..2bf4f7c718b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import java.util.Objects; + +/** + * The current leader of the MetaLog. + */ +public class MetaLogLeader { + private final int nodeId; + private final long epoch; + + public MetaLogLeader(int nodeId, long epoch) { + this.nodeId = nodeId; + this.epoch = epoch; + } + + public int nodeId() { + return nodeId; + } + + public long epoch() { + return epoch; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MetaLogLeader)) return false; + MetaLogLeader other = (MetaLogLeader) o; + return other.nodeId == nodeId && other.epoch == epoch; + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, epoch); + } + + @Override + public String toString() { + return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")"; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java new file mode 100644 index 00000000000..93744202dc9 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.protocol.ApiMessage; + +import java.util.List; + +/** + * Listeners receive notifications from the MetaLogManager. + */ +public interface MetaLogListener { + /** + * Called when the MetaLogManager commits some messages. + * + * @param lastOffset The last offset found in all the given messages. + * @param messages The messages. + */ + void handleCommits(long lastOffset, List messages); + + /** + * Called when a new leader is elected. + * + * @param leader The new leader id and epoch. + */ + default void handleNewLeader(MetaLogLeader leader) {} + + /** + * Called when the MetaLogManager has renounced the leadership. + * + * @param epoch The controller epoch that has ended. + */ + default void handleRenounce(long epoch) {} + + /** + * Called when the MetaLogManager has finished shutting down, and wants to tell its + * listener that it is safe to shut down as well. + */ + default void beginShutdown() {} +} diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java new file mode 100644 index 00000000000..67a6ca5385f --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.metadata.ApiMessageAndVersion; + +import java.util.List; + +/** + * The MetaLogManager handles storing metadata and electing leaders. + */ +public interface MetaLogManager { + + /** + * Start this meta log manager. + * The manager must be ready to accept incoming calls after this function returns. + * It is an error to initialize a MetaLogManager more than once. + */ + void initialize() throws Exception; + + /** + * Register the listener. The manager must be initialized already. + * The listener must be ready to accept incoming calls immediately. + * + * @param listener The listener to register. + */ + void register(MetaLogListener listener) throws Exception; + + /** + * Schedule a write to the log. + * + * The write will be scheduled to happen at some time in the future. There is no + * error return or exception thrown if the write fails. Instead, the listener may + * regard the write as successful if and only if the MetaLogManager reaches the given + * offset before renouncing its leadership. The listener should determine this by + * monitoring the committed offsets. + * + * @param epoch The controller epoch. + * @param batch The batch of messages to write. + * + * @return The offset of the message. + */ + long scheduleWrite(long epoch, List batch); + + /** + * Renounce the leadership. + * + * @param epoch The epoch. If this does not match the current epoch, this + * call will be ignored. + */ + void renounce(long epoch); + + /** + * Returns the current leader. The active node may change immediately after this + * function is called, of course. + */ + MetaLogLeader leader(); + + /** + * Returns the node id. + */ + int nodeId(); + +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java new file mode 100644 index 00000000000..7a01c37e33c --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(value = 40) +public class BrokerRegistrationTest { + private static final List REGISTRATIONS = Arrays.asList( + new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"), + Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)), + Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)), + Optional.empty(), false), + new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"), + Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)), + Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)), + Optional.empty(), false), + new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"), + Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)), + Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)), + Optional.empty(), false)); + + @Test + public void testValues() { + assertEquals(0, REGISTRATIONS.get(0).id()); + assertEquals(1, REGISTRATIONS.get(1).id()); + assertEquals(2, REGISTRATIONS.get(2).id()); + } + + @Test + public void testEquals() { + assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1))); + assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0))); + assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2))); + assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0))); + assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0))); + assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1))); + assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2))); + } + + @Test + public void testToString() { + assertEquals("BrokerRegistration(id=1, epoch=0, " + + "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" + + "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " + + "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " + + "rack=Optional.empty, fenced=false)", + REGISTRATIONS.get(1).toString()); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java new file mode 100644 index 00000000000..ef85314e0ef --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * The LocalLogManager is a test implementation that relies on the contents of memory. + */ +public final class LocalLogManager implements MetaLogManager, AutoCloseable { + interface LocalBatch { + int size(); + } + + static class LeaderChangeBatch implements LocalBatch { + private final MetaLogLeader newLeader; + + LeaderChangeBatch(MetaLogLeader newLeader) { + this.newLeader = newLeader; + } + + @Override + public int size() { + return 1; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof LeaderChangeBatch)) return false; + LeaderChangeBatch other = (LeaderChangeBatch) o; + if (!other.newLeader.equals(newLeader)) return false; + return true; + } + + @Override + public int hashCode() { + return Objects.hash(newLeader); + } + + @Override + public String toString() { + return "LeaderChangeBatch(newLeader=" + newLeader + ")"; + } + } + + static class LocalRecordBatch implements LocalBatch { + private final List records; + + LocalRecordBatch(List records) { + this.records = records; + } + + @Override + public int size() { + return records.size(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof LocalRecordBatch)) return false; + LocalRecordBatch other = (LocalRecordBatch) o; + if (!other.records.equals(records)) return false; + return true; + } + + @Override + public int hashCode() { + return Objects.hash(records); + } + + @Override + public String toString() { + return "LocalRecordBatch(records=" + records + ")"; + } + } + + public static class SharedLogData { + private final Logger log = LoggerFactory.getLogger(SharedLogData.class); + private final HashMap logManagers = new HashMap<>(); + private final TreeMap batches = new TreeMap<>(); + private MetaLogLeader leader = new MetaLogLeader(-1, -1); + private long prevOffset = -1; + + synchronized void registerLogManager(LocalLogManager logManager) { + if (logManagers.put(logManager.nodeId(), logManager) != null) { + throw new RuntimeException("Can't have multiple LocalLogManagers " + + "with id " + logManager.nodeId()); + } + electLeaderIfNeeded(); + } + + synchronized void unregisterLogManager(LocalLogManager logManager) { + if (!logManagers.remove(logManager.nodeId(), logManager)) { + throw new RuntimeException("Log manager " + logManager.nodeId() + + " was not found."); + } + } + + synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) { + if (epoch != leader.epoch()) { + log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " + + "match the current leader epoch of {}.", nodeId, epoch, leader.epoch()); + return Long.MAX_VALUE; + } + if (nodeId != leader.nodeId()) { + log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " + + "match the current leader id of {}.", nodeId, leader.nodeId()); + return Long.MAX_VALUE; + } + log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); + long offset = append(batch); + electLeaderIfNeeded(); + return offset; + } + + synchronized long append(LocalBatch batch) { + prevOffset += batch.size(); + log.debug("append(batch={}, prevOffset={})", batch, prevOffset); + batches.put(prevOffset, batch); + if (batch instanceof LeaderChangeBatch) { + LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch; + leader = leaderChangeBatch.newLeader; + } + for (LocalLogManager logManager : logManagers.values()) { + logManager.scheduleLogCheck(); + } + return prevOffset; + } + + synchronized void electLeaderIfNeeded() { + if (leader.nodeId() != -1 || logManagers.isEmpty()) { + return; + } + int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size()); + Iterator iter = logManagers.keySet().iterator(); + Integer nextLeaderNode = null; + for (int i = 0; i <= nextLeaderIndex; i++) { + nextLeaderNode = iter.next(); + } + MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1); + log.info("Elected new leader: {}.", newLeader); + append(new LeaderChangeBatch(newLeader)); + } + + synchronized Entry nextBatch(long offset) { + Entry entry = batches.higherEntry(offset); + if (entry == null) { + return null; + } + return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue()); + } + } + + private static class MetaLogListenerData { + private long offset = -1; + private final MetaLogListener listener; + + MetaLogListenerData(MetaLogListener listener) { + this.listener = listener; + } + } + + private final Logger log; + + private final int nodeId; + + private final SharedLogData shared; + + private final EventQueue eventQueue; + + private boolean initialized = false; + + private boolean shutdown = false; + + private long maxReadOffset = Long.MAX_VALUE; + + private final List listeners = new ArrayList<>(); + + private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1); + + public LocalLogManager(LogContext logContext, + int nodeId, + SharedLogData shared, + String threadNamePrefix) { + this.log = logContext.logger(LocalLogManager.class); + this.nodeId = nodeId; + this.shared = shared; + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix); + shared.registerLogManager(this); + } + + private void scheduleLogCheck() { + eventQueue.append(() -> { + try { + log.debug("Node {}: running log check.", nodeId); + int numEntriesFound = 0; + for (MetaLogListenerData listenerData : listeners) { + while (true) { + Entry entry = shared.nextBatch(listenerData.offset); + if (entry == null) { + log.trace("Node {}: reached the end of the log after finding " + + "{} entries.", nodeId, numEntriesFound); + break; + } + long entryOffset = entry.getKey(); + if (entryOffset > maxReadOffset) { + log.trace("Node {}: after {} entries, not reading the next " + + "entry because its offset is {}, and maxReadOffset is {}.", + nodeId, numEntriesFound, entryOffset, maxReadOffset); + break; + } + if (entry.getValue() instanceof LeaderChangeBatch) { + LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue(); + log.trace("Node {}: handling LeaderChange to {}.", + nodeId, batch.newLeader); + listenerData.listener.handleNewLeader(batch.newLeader); + if (batch.newLeader.epoch() > leader.epoch()) { + leader = batch.newLeader; + } + } else if (entry.getValue() instanceof LocalRecordBatch) { + LocalRecordBatch batch = (LocalRecordBatch) entry.getValue(); + log.trace("Node {}: handling LocalRecordBatch with offset {}.", + nodeId, entryOffset); + listenerData.listener.handleCommits(entryOffset, batch.records); + } + numEntriesFound++; + listenerData.offset = entryOffset; + } + } + log.trace("Completed log check for node " + nodeId); + } catch (Exception e) { + log.error("Exception while handling log check", e); + } + }); + } + + public void beginShutdown() { + eventQueue.beginShutdown("beginShutdown", () -> { + try { + if (initialized && !shutdown) { + log.debug("Node {}: beginning shutdown.", nodeId); + renounce(leader.epoch()); + for (MetaLogListenerData listenerData : listeners) { + listenerData.listener.beginShutdown(); + } + shared.unregisterLogManager(this); + } + } catch (Exception e) { + log.error("Unexpected exception while sending beginShutdown callbacks", e); + } + shutdown = true; + }); + } + + @Override + public void close() throws InterruptedException { + log.debug("Node {}: closing.", nodeId); + beginShutdown(); + eventQueue.close(); + } + + @Override + public void initialize() throws Exception { + eventQueue.append(() -> { + log.debug("initialized local log manager for node " + nodeId); + initialized = true; + }); + } + + @Override + public void register(MetaLogListener listener) throws Exception { + CompletableFuture future = new CompletableFuture<>(); + eventQueue.append(() -> { + if (shutdown) { + log.info("Node {}: can't register because local log manager has " + + "already been shut down.", nodeId); + future.complete(null); + } else if (initialized) { + log.info("Node {}: registered MetaLogListener.", nodeId); + listeners.add(new MetaLogListenerData(listener)); + shared.electLeaderIfNeeded(); + scheduleLogCheck(); + future.complete(null); + } else { + log.info("Node {}: can't register because local log manager has not " + + "been initialized.", nodeId); + future.completeExceptionally(new RuntimeException( + "LocalLogManager was not initialized.")); + } + }); + future.get(); + } + + @Override + public long scheduleWrite(long epoch, List batch) { + return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch( + batch.stream().map(r -> r.message()).collect(Collectors.toList()))); + } + + @Override + public void renounce(long epoch) { + MetaLogLeader curLeader = leader; + MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1); + shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader)); + } + + @Override + public MetaLogLeader leader() { + return leader; + } + + @Override + public int nodeId() { + return nodeId; + } + + public List listeners() { + final CompletableFuture> future = new CompletableFuture<>(); + eventQueue.append(() -> { + future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList())); + }); + try { + return future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + public void setMaxReadOffset(long maxReadOffset) { + CompletableFuture future = new CompletableFuture<>(); + eventQueue.append(() -> { + log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset); + this.maxReadOffset = maxReadOffset; + scheduleLogCheck(); + future.complete(null); + }); + try { + future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java new file mode 100644 index 00000000000..9d4eb8b594e --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT; +import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET; +import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Timeout(value = 40) +public class LocalLogManagerTest { + private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class); + + /** + * Test creating a LocalLogManager and closing it. + */ + @Test + public void testCreateAndClose() throws Exception { + try (LocalLogManagerTestEnv env = + LocalLogManagerTestEnv.createWithMockListeners(1)) { + env.close(); + assertEquals(null, env.firstError.get()); + } + } + + /** + * Test that the local log maanger will claim leadership. + */ + @Test + public void testClaimsLeadership() throws Exception { + try (LocalLogManagerTestEnv env = + LocalLogManagerTestEnv.createWithMockListeners(1)) { + assertEquals(new MetaLogLeader(0, 0), env.waitForLeader()); + env.close(); + assertEquals(null, env.firstError.get()); + } + } + + /** + * Test that we can pass leadership back and forth between log managers. + */ + @Test + public void testPassLeadership() throws Exception { + try (LocalLogManagerTestEnv env = + LocalLogManagerTestEnv.createWithMockListeners(3)) { + MetaLogLeader first = env.waitForLeader(); + MetaLogLeader cur = first; + do { + env.logManagers().get(cur.nodeId()).renounce(cur.epoch()); + MetaLogLeader next = env.waitForLeader(); + while (next.epoch() == cur.epoch()) { + Thread.sleep(1); + next = env.waitForLeader(); + } + long expectedNextEpoch = cur.epoch() + 2; + assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch + + ", but found " + next); + cur = next; + } while (cur.nodeId() == first.nodeId()); + env.close(); + assertEquals(null, env.firstError.get()); + } + } + + private static void waitForLastCommittedOffset(long targetOffset, + LocalLogManager logManager) throws InterruptedException { + TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + MockMetaLogManagerListener listener = + (MockMetaLogManagerListener) logManager.listeners().get(0); + long highestOffset = -1; + for (String event : listener.serializedEvents()) { + if (event.startsWith(LAST_COMMITTED_OFFSET)) { + long offset = Long.valueOf( + event.substring(LAST_COMMITTED_OFFSET.length() + 1)); + if (offset < highestOffset) { + throw new RuntimeException("Invalid offset: " + offset + + " is less than the previous offset of " + highestOffset); + } + highestOffset = offset; + } + } + if (highestOffset < targetOffset) { + throw new RuntimeException("Offset for log manager " + + logManager.nodeId() + " only reached " + highestOffset); + } + }); + } + + /** + * Test that all the log managers see all the commits. + */ + @Test + public void testCommits() throws Exception { + try (LocalLogManagerTestEnv env = + LocalLogManagerTestEnv.createWithMockListeners(3)) { + MetaLogLeader leaderInfo = env.waitForLeader(); + LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId()); + long epoch = activeLogManager.leader().epoch(); + List messages = Arrays.asList( + new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0), + new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0), + new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0)); + assertEquals(3, activeLogManager.scheduleWrite(epoch, messages)); + for (LocalLogManager logManager : env.logManagers()) { + waitForLastCommittedOffset(3, logManager); + } + List listeners = env.logManagers().stream(). + map(m -> (MockMetaLogManagerListener) m.listeners().get(0)). + collect(Collectors.toList()); + env.close(); + for (MockMetaLogManagerListener listener : listeners) { + List events = listener.serializedEvents(); + assertEquals(SHUTDOWN, events.get(events.size() - 1)); + int foundIndex = 0; + for (String event : events) { + if (event.startsWith(COMMIT)) { + assertEquals(messages.get(foundIndex).message().toString(), + event.substring(COMMIT.length() + 1)); + foundIndex++; + } + } + assertEquals(messages.size(), foundIndex); + } + } + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java new file mode 100644 index 00000000000..52aeea052bd --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metalog.LocalLogManager.SharedLogData; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +public class LocalLogManagerTestEnv implements AutoCloseable { + private static final Logger log = + LoggerFactory.getLogger(LocalLogManagerTestEnv.class); + + /** + * The first error we encountered during this test, or the empty string if we have + * not encountered any. + */ + final AtomicReference firstError = new AtomicReference<>(null); + + /** + * The test directory, which we will delete once the test is over. + */ + private final File dir; + + /** + * The shared data for our LocalLogManager instances. + */ + private final SharedLogData shared; + + /** + * A list of log managers. + */ + private final List logManagers; + + public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception { + LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers); + try { + for (LocalLogManager logManager : testEnv.logManagers) { + logManager.register(new MockMetaLogManagerListener()); + } + } catch (Exception e) { + testEnv.close(); + throw e; + } + return testEnv; + } + + public LocalLogManagerTestEnv(int numManagers) throws Exception { + dir = TestUtils.tempDirectory(); + shared = new SharedLogData(); + List newLogManagers = new ArrayList<>(numManagers); + try { + for (int nodeId = 0; nodeId < numManagers; nodeId++) { + newLogManagers.add(new LocalLogManager( + new LogContext(String.format("[LocalLogManager %d] ", nodeId)), + nodeId, + shared, + String.format("LocalLogManager-%d_", nodeId))); + } + for (LocalLogManager logManager : newLogManagers) { + logManager.initialize(); + } + } catch (Throwable t) { + for (LocalLogManager logManager : newLogManagers) { + logManager.close(); + } + throw t; + } + this.logManagers = newLogManagers; + } + + AtomicReference firstError() { + return firstError; + } + + File dir() { + return dir; + } + + MetaLogLeader waitForLeader() throws InterruptedException { + AtomicReference value = new AtomicReference<>(null); + TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + MetaLogLeader result = null; + for (LocalLogManager logManager : logManagers) { + MetaLogLeader leader = logManager.leader(); + if (leader.nodeId() == logManager.nodeId()) { + if (result != null) { + throw new RuntimeException("node " + leader.nodeId() + + " thinks it's the leader, but so does " + result.nodeId()); + } + result = leader; + } + } + if (result == null) { + throw new RuntimeException("No leader found."); + } + value.set(result); + }); + return value.get(); + } + + public List logManagers() { + return logManagers; + } + + @Override + public void close() throws InterruptedException { + try { + for (LocalLogManager logManager : logManagers) { + logManager.beginShutdown(); + } + for (LocalLogManager logManager : logManagers) { + logManager.close(); + } + Utils.delete(dir); + } catch (IOException e) { + log.error("Error deleting {}", dir.getAbsolutePath(), e); + } + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java new file mode 100644 index 00000000000..fe61ec07028 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.protocol.ApiMessage; + +import java.util.ArrayList; +import java.util.List; + +public class MockMetaLogManagerListener implements MetaLogListener { + public static final String COMMIT = "COMMIT"; + public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET"; + public static final String NEW_LEADER = "NEW_LEADER"; + public static final String RENOUNCE = "RENOUNCE"; + public static final String SHUTDOWN = "SHUTDOWN"; + + private final List serializedEvents = new ArrayList<>(); + + @Override + public synchronized void handleCommits(long lastCommittedOffset, List messages) { + for (ApiMessage message : messages) { + StringBuilder bld = new StringBuilder(); + bld.append(COMMIT).append(" ").append(message.toString()); + serializedEvents.add(bld.toString()); + } + StringBuilder bld = new StringBuilder(); + bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset); + serializedEvents.add(bld.toString()); + } + + @Override + public void handleNewLeader(MetaLogLeader leader) { + StringBuilder bld = new StringBuilder(); + bld.append(NEW_LEADER).append(" "). + append(leader.nodeId()).append(" ").append(leader.epoch()); + synchronized (this) { + serializedEvents.add(bld.toString()); + } + } + + @Override + public void handleRenounce(long epoch) { + StringBuilder bld = new StringBuilder(); + bld.append(RENOUNCE).append(" ").append(epoch); + synchronized (this) { + serializedEvents.add(bld.toString()); + } + } + + @Override + public void beginShutdown() { + StringBuilder bld = new StringBuilder(); + bld.append(SHUTDOWN); + synchronized (this) { + serializedEvents.add(bld.toString()); + } + } + + public synchronized List serializedEvents() { + return new ArrayList<>(serializedEvents); + } +}