MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)

Add MetaLogListener, LocalLogManager, and related classes. These
classes are used by the KIP-500 controller and broker to interface with the
Raft log.

Also add the Controller interface. The implementation will be added in a separate PR.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2021-02-11 08:42:59 -08:00 committed by GitHub
parent 6b3a4553f9
commit bf5e1f1cc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1729 additions and 0 deletions

View File

@ -192,6 +192,27 @@
</subpackage> </subpackage>
</subpackage> </subpackage>
<subpackage name="controller">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
<subpackage name="metadata"> <subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.annotation" /> <allow pkg="org.apache.kafka.common.annotation" />
@ -201,6 +222,15 @@
<allow pkg="org.apache.kafka.test" /> <allow pkg="org.apache.kafka.test" />
</subpackage> </subpackage>
<subpackage name="metalog">
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="clients"> <subpackage name="clients">
<allow pkg="org.slf4j" /> <allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.common" />

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public interface Controller extends AutoCloseable {
/**
* Change the in-sync replica sets for some partitions.
*
* @param request The AlterIsrRequest data.
*
* @return A future yielding the response.
*/
CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request);
/**
* Create a batch of topics.
*
* @param request The CreateTopicsRequest data.
*
* @return A future yielding the response.
*/
CompletableFuture<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request);
/**
* Unregister a broker.
*
* @param brokerId The broker id to unregister.
*
* @return A future that is completed successfully when the broker is
* unregistered.
*/
CompletableFuture<Void> unregisterBroker(int brokerId);
/**
* Describe the current configuration of various resources.
*
* @param resources A map from resources to the collection of config keys that we
* want to describe for each. If the collection is empty, then
* all configuration keys will be described.
*
* @return
*/
CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
describeConfigs(Map<ConfigResource, Collection<String>> resources);
/**
* Elect new partition leaders.
*
* @param request The request.
*
* @return A future yielding the elect leaders response.
*/
CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request);
/**
* Get the current finalized feature ranges for each feature.
*
* @return A future yielding the feature ranges.
*/
CompletableFuture<FeatureMapAndEpoch> finalizedFeatures();
/**
* Perform some incremental configuration changes.
*
* @param configChanges The changes.
* @param validateOnly True if we should validate the changes but not apply them.
*
* @return A future yielding a map from partitions to error results.
*/
CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
boolean validateOnly);
/**
* Perform some configuration changes using the legacy API.
*
* @param newConfigs The new configuration maps to apply.
* @param validateOnly True if we should validate the changes but not apply them.
*
* @return A future yielding a map from partitions to error results.
*/
CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly);
/**
* Process a heartbeat from a broker.
*
* @param request The broker heartbeat request.
*
* @return A future yielding a heartbeat reply.
*/
CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request);
/**
* Attempt to register the given broker.
*
* @param request The registration request.
*
* @return A future yielding a registration reply.
*/
CompletableFuture<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request);
/**
* Wait for the given number of brokers to be registered and unfenced.
* This is for testing.
*
* @param minBrokers The minimum number of brokers to wait for.
* @return A future which is completed when the given number of brokers
* is reached.
*/
CompletableFuture<Void> waitForReadyBrokers(int minBrokers);
/**
* Perform some client quota changes
*
* @param quotaAlterations The list of quotas to alter
* @param validateOnly True if we should validate the changes but not apply them.
* @return A future yielding a map of quota entities to error results.
*/
CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
);
/**
* Begin shutting down, but don't block. You must still call close to clean up all
* resources.
*/
void beginShutdown();
/**
* If this controller is active, this is the non-negative controller epoch.
* Otherwise, this is -1.
*/
long curClaimEpoch();
/**
* Blocks until we have shut down and freed all resources.
*/
void close() throws InterruptedException;
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import java.util.Objects;
class ResultOrError<T> {
private final ApiError error;
private final T result;
public ResultOrError(Errors error, String message) {
this(new ApiError(error, message));
}
public ResultOrError(ApiError error) {
Objects.requireNonNull(error);
this.error = error;
this.result = null;
}
public ResultOrError(T result) {
this.error = null;
this.result = result;
}
public boolean isError() {
return error != null;
}
public boolean isResult() {
return error == null;
}
public ApiError error() {
return error;
}
public T result() {
return result;
}
@Override
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) {
return false;
}
ResultOrError other = (ResultOrError) o;
return error.equals(other.error) &&
Objects.equals(result, other.result);
}
@Override
public int hashCode() {
return Objects.hash(error, result);
}
@Override
public String toString() {
if (error.isSuccess()) {
return "ResultOrError(" + result + ")";
} else {
return "ResultOrError(" + error + ")";
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import java.util.Objects;
public class BrokerHeartbeatReply {
/**
* True if the heartbeat reply should tell the broker that it has caught up.
*/
private final boolean isCaughtUp;
/**
* True if the heartbeat reply should tell the broker that it is fenced.
*/
private final boolean isFenced;
/**
* True if the heartbeat reply should tell the broker that it should shut down.
*/
private final boolean shouldShutDown;
public BrokerHeartbeatReply(boolean isCaughtUp,
boolean isFenced,
boolean shouldShutDown) {
this.isCaughtUp = isCaughtUp;
this.isFenced = isFenced;
this.shouldShutDown = shouldShutDown;
}
public boolean isCaughtUp() {
return isCaughtUp;
}
public boolean isFenced() {
return isFenced;
}
public boolean shouldShutDown() {
return shouldShutDown;
}
@Override
public int hashCode() {
return Objects.hash(isCaughtUp, isFenced, shouldShutDown);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof BrokerHeartbeatReply)) return false;
BrokerHeartbeatReply other = (BrokerHeartbeatReply) o;
return other.isCaughtUp == isCaughtUp &&
other.isFenced == isFenced &&
other.shouldShutDown == shouldShutDown;
}
@Override
public String toString() {
return "BrokerHeartbeatReply(isCaughtUp=" + isCaughtUp +
", isFenced=" + isFenced +
", shouldShutDown = " + shouldShutDown +
")";
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* An immutable class which represents broker registrations.
*/
public class BrokerRegistration {
private final int id;
private final long epoch;
private final Uuid incarnationId;
private final Map<String, Endpoint> listeners;
private final Map<String, VersionRange> supportedFeatures;
private final Optional<String> rack;
private final boolean fenced;
public BrokerRegistration(int id,
long epoch,
Uuid incarnationId,
List<Endpoint> listeners,
Map<String, VersionRange> supportedFeatures,
Optional<String> rack,
boolean fenced) {
this.id = id;
this.epoch = epoch;
this.incarnationId = incarnationId;
Map<String, Endpoint> listenersMap = new HashMap<>();
for (Endpoint endpoint : listeners) {
listenersMap.put(endpoint.listenerName().get(), endpoint);
}
this.listeners = Collections.unmodifiableMap(listenersMap);
Objects.requireNonNull(supportedFeatures);
this.supportedFeatures = new HashMap<>(supportedFeatures);
Objects.requireNonNull(rack);
this.rack = rack;
this.fenced = fenced;
}
public BrokerRegistration(int id,
long epoch,
Uuid incarnationId,
Map<String, Endpoint> listeners,
Map<String, VersionRange> supportedFeatures,
Optional<String> rack,
boolean fenced) {
this.id = id;
this.epoch = epoch;
this.incarnationId = incarnationId;
this.listeners = new HashMap<>(listeners);
this.supportedFeatures = new HashMap<>(supportedFeatures);
this.rack = rack;
this.fenced = fenced;
}
public int id() {
return id;
}
public long epoch() {
return epoch;
}
public Uuid incarnationId() {
return incarnationId;
}
public Map<String, Endpoint> listeners() {
return listeners;
}
public Map<String, VersionRange> supportedFeatures() {
return supportedFeatures;
}
public Optional<String> rack() {
return rack;
}
public boolean fenced() {
return fenced;
}
@Override
public int hashCode() {
return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
rack, fenced);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof BrokerRegistration)) return false;
BrokerRegistration other = (BrokerRegistration) o;
return other.id == id &&
other.epoch == epoch &&
other.incarnationId.equals(incarnationId) &&
other.listeners.equals(listeners) &&
other.supportedFeatures.equals(supportedFeatures) &&
other.rack.equals(rack) &&
other.fenced == fenced;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("BrokerRegistration(id=").append(id);
bld.append(", epoch=").append(epoch);
bld.append(", incarnationId=").append(incarnationId);
bld.append(", listeners=[").append(
listeners.keySet().stream().sorted().
map(n -> listeners.get(n).toString()).
collect(Collectors.joining(", ")));
bld.append("], supportedFeatures={").append(
supportedFeatures.entrySet().stream().sorted().
map(e -> e.getKey() + ": " + e.getValue()).
collect(Collectors.joining(", ")));
bld.append("}");
bld.append(", rack=").append(rack);
bld.append(", fenced=").append(fenced);
bld.append(")");
return bld.toString();
}
public BrokerRegistration cloneWithFencing(boolean fencing) {
return new BrokerRegistration(id, epoch, incarnationId, listeners,
supportedFeatures, rack, fencing);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import java.util.Objects;
public class BrokerRegistrationReply {
private final long epoch;
public BrokerRegistrationReply(long epoch) {
this.epoch = epoch;
}
public long epoch() {
return epoch;
}
@Override
public int hashCode() {
return Objects.hash(epoch);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof BrokerRegistrationReply)) return false;
BrokerRegistrationReply other = (BrokerRegistrationReply) o;
return other.epoch == epoch;
}
@Override
public String toString() {
return "BrokerRegistrationReply(epoch=" + epoch + ")";
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* A map of feature names to their supported versions.
*/
public class FeatureMap {
private final Map<String, VersionRange> features;
public FeatureMap(Map<String, VersionRange> features) {
this.features = Collections.unmodifiableMap(new HashMap<>(features));
}
public Optional<VersionRange> get(String name) {
return Optional.ofNullable(features.get(name));
}
public Map<String, VersionRange> features() {
return features;
}
@Override
public int hashCode() {
return features.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof FeatureMap)) return false;
FeatureMap other = (FeatureMap) o;
return features.equals(other.features);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{");
bld.append(features.keySet().stream().sorted().
map(k -> k + ": " + features.get(k)).
collect(Collectors.joining(", ")));
bld.append("}");
return bld.toString();
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import java.util.Objects;
/**
* A map of feature names to their supported versions.
*/
public class FeatureMapAndEpoch {
private final FeatureMap map;
private final long epoch;
public FeatureMapAndEpoch(FeatureMap map, long epoch) {
this.map = map;
this.epoch = epoch;
}
public FeatureMap map() {
return map;
}
public long epoch() {
return epoch;
}
@Override
public int hashCode() {
return Objects.hash(map, epoch);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof FeatureMapAndEpoch)) return false;
FeatureMapAndEpoch other = (FeatureMapAndEpoch) o;
return map.equals(other.map) && epoch == other.epoch;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{");
bld.append("map=").append(map.toString());
bld.append(", epoch=").append(epoch);
bld.append("}");
return bld.toString();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import java.util.Objects;
/**
* The current leader of the MetaLog.
*/
public class MetaLogLeader {
private final int nodeId;
private final long epoch;
public MetaLogLeader(int nodeId, long epoch) {
this.nodeId = nodeId;
this.epoch = epoch;
}
public int nodeId() {
return nodeId;
}
public long epoch() {
return epoch;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MetaLogLeader)) return false;
MetaLogLeader other = (MetaLogLeader) o;
return other.nodeId == nodeId && other.epoch == epoch;
}
@Override
public int hashCode() {
return Objects.hash(nodeId, epoch);
}
@Override
public String toString() {
return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import java.util.List;
/**
* Listeners receive notifications from the MetaLogManager.
*/
public interface MetaLogListener {
/**
* Called when the MetaLogManager commits some messages.
*
* @param lastOffset The last offset found in all the given messages.
* @param messages The messages.
*/
void handleCommits(long lastOffset, List<ApiMessage> messages);
/**
* Called when a new leader is elected.
*
* @param leader The new leader id and epoch.
*/
default void handleNewLeader(MetaLogLeader leader) {}
/**
* Called when the MetaLogManager has renounced the leadership.
*
* @param epoch The controller epoch that has ended.
*/
default void handleRenounce(long epoch) {}
/**
* Called when the MetaLogManager has finished shutting down, and wants to tell its
* listener that it is safe to shut down as well.
*/
default void beginShutdown() {}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import java.util.List;
/**
* The MetaLogManager handles storing metadata and electing leaders.
*/
public interface MetaLogManager {
/**
* Start this meta log manager.
* The manager must be ready to accept incoming calls after this function returns.
* It is an error to initialize a MetaLogManager more than once.
*/
void initialize() throws Exception;
/**
* Register the listener. The manager must be initialized already.
* The listener must be ready to accept incoming calls immediately.
*
* @param listener The listener to register.
*/
void register(MetaLogListener listener) throws Exception;
/**
* Schedule a write to the log.
*
* The write will be scheduled to happen at some time in the future. There is no
* error return or exception thrown if the write fails. Instead, the listener may
* regard the write as successful if and only if the MetaLogManager reaches the given
* offset before renouncing its leadership. The listener should determine this by
* monitoring the committed offsets.
*
* @param epoch The controller epoch.
* @param batch The batch of messages to write.
*
* @return The offset of the message.
*/
long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
/**
* Renounce the leadership.
*
* @param epoch The epoch. If this does not match the current epoch, this
* call will be ignored.
*/
void renounce(long epoch);
/**
* Returns the current leader. The active node may change immediately after this
* function is called, of course.
*/
MetaLogLeader leader();
/**
* Returns the node id.
*/
int nodeId();
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class BrokerRegistrationTest {
private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
Optional.empty(), false),
new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
Optional.empty(), false),
new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
Optional.empty(), false));
@Test
public void testValues() {
assertEquals(0, REGISTRATIONS.get(0).id());
assertEquals(1, REGISTRATIONS.get(1).id());
assertEquals(2, REGISTRATIONS.get(2).id());
}
@Test
public void testEquals() {
assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1)));
assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0)));
assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2)));
assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0)));
assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0)));
assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1)));
assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2)));
}
@Test
public void testToString() {
assertEquals("BrokerRegistration(id=1, epoch=0, " +
"incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
"rack=Optional.empty, fenced=false)",
REGISTRATIONS.get(1).toString());
}
}

View File

@ -0,0 +1,378 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/**
* The LocalLogManager is a test implementation that relies on the contents of memory.
*/
public final class LocalLogManager implements MetaLogManager, AutoCloseable {
interface LocalBatch {
int size();
}
static class LeaderChangeBatch implements LocalBatch {
private final MetaLogLeader newLeader;
LeaderChangeBatch(MetaLogLeader newLeader) {
this.newLeader = newLeader;
}
@Override
public int size() {
return 1;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LeaderChangeBatch)) return false;
LeaderChangeBatch other = (LeaderChangeBatch) o;
if (!other.newLeader.equals(newLeader)) return false;
return true;
}
@Override
public int hashCode() {
return Objects.hash(newLeader);
}
@Override
public String toString() {
return "LeaderChangeBatch(newLeader=" + newLeader + ")";
}
}
static class LocalRecordBatch implements LocalBatch {
private final List<ApiMessage> records;
LocalRecordBatch(List<ApiMessage> records) {
this.records = records;
}
@Override
public int size() {
return records.size();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LocalRecordBatch)) return false;
LocalRecordBatch other = (LocalRecordBatch) o;
if (!other.records.equals(records)) return false;
return true;
}
@Override
public int hashCode() {
return Objects.hash(records);
}
@Override
public String toString() {
return "LocalRecordBatch(records=" + records + ")";
}
}
public static class SharedLogData {
private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
private MetaLogLeader leader = new MetaLogLeader(-1, -1);
private long prevOffset = -1;
synchronized void registerLogManager(LocalLogManager logManager) {
if (logManagers.put(logManager.nodeId(), logManager) != null) {
throw new RuntimeException("Can't have multiple LocalLogManagers " +
"with id " + logManager.nodeId());
}
electLeaderIfNeeded();
}
synchronized void unregisterLogManager(LocalLogManager logManager) {
if (!logManagers.remove(logManager.nodeId(), logManager)) {
throw new RuntimeException("Log manager " + logManager.nodeId() +
" was not found.");
}
}
synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
if (epoch != leader.epoch()) {
log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
"match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
return Long.MAX_VALUE;
}
if (nodeId != leader.nodeId()) {
log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
"match the current leader id of {}.", nodeId, leader.nodeId());
return Long.MAX_VALUE;
}
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
long offset = append(batch);
electLeaderIfNeeded();
return offset;
}
synchronized long append(LocalBatch batch) {
prevOffset += batch.size();
log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
batches.put(prevOffset, batch);
if (batch instanceof LeaderChangeBatch) {
LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch;
leader = leaderChangeBatch.newLeader;
}
for (LocalLogManager logManager : logManagers.values()) {
logManager.scheduleLogCheck();
}
return prevOffset;
}
synchronized void electLeaderIfNeeded() {
if (leader.nodeId() != -1 || logManagers.isEmpty()) {
return;
}
int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
Iterator<Integer> iter = logManagers.keySet().iterator();
Integer nextLeaderNode = null;
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
}
MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
log.info("Elected new leader: {}.", newLeader);
append(new LeaderChangeBatch(newLeader));
}
synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
if (entry == null) {
return null;
}
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
}
}
private static class MetaLogListenerData {
private long offset = -1;
private final MetaLogListener listener;
MetaLogListenerData(MetaLogListener listener) {
this.listener = listener;
}
}
private final Logger log;
private final int nodeId;
private final SharedLogData shared;
private final EventQueue eventQueue;
private boolean initialized = false;
private boolean shutdown = false;
private long maxReadOffset = Long.MAX_VALUE;
private final List<MetaLogListenerData> listeners = new ArrayList<>();
private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
public LocalLogManager(LogContext logContext,
int nodeId,
SharedLogData shared,
String threadNamePrefix) {
this.log = logContext.logger(LocalLogManager.class);
this.nodeId = nodeId;
this.shared = shared;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
shared.registerLogManager(this);
}
private void scheduleLogCheck() {
eventQueue.append(() -> {
try {
log.debug("Node {}: running log check.", nodeId);
int numEntriesFound = 0;
for (MetaLogListenerData listenerData : listeners) {
while (true) {
Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset);
if (entry == null) {
log.trace("Node {}: reached the end of the log after finding " +
"{} entries.", nodeId, numEntriesFound);
break;
}
long entryOffset = entry.getKey();
if (entryOffset > maxReadOffset) {
log.trace("Node {}: after {} entries, not reading the next " +
"entry because its offset is {}, and maxReadOffset is {}.",
nodeId, numEntriesFound, entryOffset, maxReadOffset);
break;
}
if (entry.getValue() instanceof LeaderChangeBatch) {
LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
log.trace("Node {}: handling LeaderChange to {}.",
nodeId, batch.newLeader);
listenerData.listener.handleNewLeader(batch.newLeader);
if (batch.newLeader.epoch() > leader.epoch()) {
leader = batch.newLeader;
}
} else if (entry.getValue() instanceof LocalRecordBatch) {
LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
log.trace("Node {}: handling LocalRecordBatch with offset {}.",
nodeId, entryOffset);
listenerData.listener.handleCommits(entryOffset, batch.records);
}
numEntriesFound++;
listenerData.offset = entryOffset;
}
}
log.trace("Completed log check for node " + nodeId);
} catch (Exception e) {
log.error("Exception while handling log check", e);
}
});
}
public void beginShutdown() {
eventQueue.beginShutdown("beginShutdown", () -> {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
renounce(leader.epoch());
for (MetaLogListenerData listenerData : listeners) {
listenerData.listener.beginShutdown();
}
shared.unregisterLogManager(this);
}
} catch (Exception e) {
log.error("Unexpected exception while sending beginShutdown callbacks", e);
}
shutdown = true;
});
}
@Override
public void close() throws InterruptedException {
log.debug("Node {}: closing.", nodeId);
beginShutdown();
eventQueue.close();
}
@Override
public void initialize() throws Exception {
eventQueue.append(() -> {
log.debug("initialized local log manager for node " + nodeId);
initialized = true;
});
}
@Override
public void register(MetaLogListener listener) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
log.info("Node {}: can't register because local log manager has " +
"already been shut down.", nodeId);
future.complete(null);
} else if (initialized) {
log.info("Node {}: registered MetaLogListener.", nodeId);
listeners.add(new MetaLogListenerData(listener));
shared.electLeaderIfNeeded();
scheduleLogCheck();
future.complete(null);
} else {
log.info("Node {}: can't register because local log manager has not " +
"been initialized.", nodeId);
future.completeExceptionally(new RuntimeException(
"LocalLogManager was not initialized."));
}
});
future.get();
}
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
batch.stream().map(r -> r.message()).collect(Collectors.toList())));
}
@Override
public void renounce(long epoch) {
MetaLogLeader curLeader = leader;
MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
}
@Override
public MetaLogLeader leader() {
return leader;
}
@Override
public int nodeId() {
return nodeId;
}
public List<MetaLogListener> listeners() {
final CompletableFuture<List<MetaLogListener>> future = new CompletableFuture<>();
eventQueue.append(() -> {
future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
});
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
public void setMaxReadOffset(long maxReadOffset) {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
this.maxReadOffset = maxReadOffset;
scheduleLogCheck();
future.complete(null);
});
try {
future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class LocalLogManagerTest {
private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
/**
* Test creating a LocalLogManager and closing it.
*/
@Test
public void testCreateAndClose() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(1)) {
env.close();
assertEquals(null, env.firstError.get());
}
}
/**
* Test that the local log maanger will claim leadership.
*/
@Test
public void testClaimsLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(1)) {
assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
env.close();
assertEquals(null, env.firstError.get());
}
}
/**
* Test that we can pass leadership back and forth between log managers.
*/
@Test
public void testPassLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(3)) {
MetaLogLeader first = env.waitForLeader();
MetaLogLeader cur = first;
do {
env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
MetaLogLeader next = env.waitForLeader();
while (next.epoch() == cur.epoch()) {
Thread.sleep(1);
next = env.waitForLeader();
}
long expectedNextEpoch = cur.epoch() + 2;
assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
", but found " + next);
cur = next;
} while (cur.nodeId() == first.nodeId());
env.close();
assertEquals(null, env.firstError.get());
}
}
private static void waitForLastCommittedOffset(long targetOffset,
LocalLogManager logManager) throws InterruptedException {
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
MockMetaLogManagerListener listener =
(MockMetaLogManagerListener) logManager.listeners().get(0);
long highestOffset = -1;
for (String event : listener.serializedEvents()) {
if (event.startsWith(LAST_COMMITTED_OFFSET)) {
long offset = Long.valueOf(
event.substring(LAST_COMMITTED_OFFSET.length() + 1));
if (offset < highestOffset) {
throw new RuntimeException("Invalid offset: " + offset +
" is less than the previous offset of " + highestOffset);
}
highestOffset = offset;
}
}
if (highestOffset < targetOffset) {
throw new RuntimeException("Offset for log manager " +
logManager.nodeId() + " only reached " + highestOffset);
}
});
}
/**
* Test that all the log managers see all the commits.
*/
@Test
public void testCommits() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(3)) {
MetaLogLeader leaderInfo = env.waitForLeader();
LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
long epoch = activeLogManager.leader().epoch();
List<ApiMessageAndVersion> messages = Arrays.asList(
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
for (LocalLogManager logManager : env.logManagers()) {
waitForLastCommittedOffset(3, logManager);
}
List<MockMetaLogManagerListener> listeners = env.logManagers().stream().
map(m -> (MockMetaLogManagerListener) m.listeners().get(0)).
collect(Collectors.toList());
env.close();
for (MockMetaLogManagerListener listener : listeners) {
List<String> events = listener.serializedEvents();
assertEquals(SHUTDOWN, events.get(events.size() - 1));
int foundIndex = 0;
for (String event : events) {
if (event.startsWith(COMMIT)) {
assertEquals(messages.get(foundIndex).message().toString(),
event.substring(COMMIT.length() + 1));
foundIndex++;
}
}
assertEquals(messages.size(), foundIndex);
}
}
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class LocalLogManagerTestEnv implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
/**
* The first error we encountered during this test, or the empty string if we have
* not encountered any.
*/
final AtomicReference<String> firstError = new AtomicReference<>(null);
/**
* The test directory, which we will delete once the test is over.
*/
private final File dir;
/**
* The shared data for our LocalLogManager instances.
*/
private final SharedLogData shared;
/**
* A list of log managers.
*/
private final List<LocalLogManager> logManagers;
public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception {
LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
try {
for (LocalLogManager logManager : testEnv.logManagers) {
logManager.register(new MockMetaLogManagerListener());
}
} catch (Exception e) {
testEnv.close();
throw e;
}
return testEnv;
}
public LocalLogManagerTestEnv(int numManagers) throws Exception {
dir = TestUtils.tempDirectory();
shared = new SharedLogData();
List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
try {
for (int nodeId = 0; nodeId < numManagers; nodeId++) {
newLogManagers.add(new LocalLogManager(
new LogContext(String.format("[LocalLogManager %d] ", nodeId)),
nodeId,
shared,
String.format("LocalLogManager-%d_", nodeId)));
}
for (LocalLogManager logManager : newLogManagers) {
logManager.initialize();
}
} catch (Throwable t) {
for (LocalLogManager logManager : newLogManagers) {
logManager.close();
}
throw t;
}
this.logManagers = newLogManagers;
}
AtomicReference<String> firstError() {
return firstError;
}
File dir() {
return dir;
}
MetaLogLeader waitForLeader() throws InterruptedException {
AtomicReference<MetaLogLeader> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
MetaLogLeader result = null;
for (LocalLogManager logManager : logManagers) {
MetaLogLeader leader = logManager.leader();
if (leader.nodeId() == logManager.nodeId()) {
if (result != null) {
throw new RuntimeException("node " + leader.nodeId() +
" thinks it's the leader, but so does " + result.nodeId());
}
result = leader;
}
}
if (result == null) {
throw new RuntimeException("No leader found.");
}
value.set(result);
});
return value.get();
}
public List<LocalLogManager> logManagers() {
return logManagers;
}
@Override
public void close() throws InterruptedException {
try {
for (LocalLogManager logManager : logManagers) {
logManager.beginShutdown();
}
for (LocalLogManager logManager : logManagers) {
logManager.close();
}
Utils.delete(dir);
} catch (IOException e) {
log.error("Error deleting {}", dir.getAbsolutePath(), e);
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import java.util.ArrayList;
import java.util.List;
public class MockMetaLogManagerListener implements MetaLogListener {
public static final String COMMIT = "COMMIT";
public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
public static final String NEW_LEADER = "NEW_LEADER";
public static final String RENOUNCE = "RENOUNCE";
public static final String SHUTDOWN = "SHUTDOWN";
private final List<String> serializedEvents = new ArrayList<>();
@Override
public synchronized void handleCommits(long lastCommittedOffset, List<ApiMessage> messages) {
for (ApiMessage message : messages) {
StringBuilder bld = new StringBuilder();
bld.append(COMMIT).append(" ").append(message.toString());
serializedEvents.add(bld.toString());
}
StringBuilder bld = new StringBuilder();
bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
serializedEvents.add(bld.toString());
}
@Override
public void handleNewLeader(MetaLogLeader leader) {
StringBuilder bld = new StringBuilder();
bld.append(NEW_LEADER).append(" ").
append(leader.nodeId()).append(" ").append(leader.epoch());
synchronized (this) {
serializedEvents.add(bld.toString());
}
}
@Override
public void handleRenounce(long epoch) {
StringBuilder bld = new StringBuilder();
bld.append(RENOUNCE).append(" ").append(epoch);
synchronized (this) {
serializedEvents.add(bld.toString());
}
}
@Override
public void beginShutdown() {
StringBuilder bld = new StringBuilder();
bld.append(SHUTDOWN);
synchronized (this) {
serializedEvents.add(bld.toString());
}
}
public synchronized List<String> serializedEvents() {
return new ArrayList<>(serializedEvents);
}
}