KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Dan Norwood <norwood@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2472 from cmccabe/KAFKA-3265
This commit is contained in:
Colin P. Mccabe 2017-05-02 00:16:01 +01:00 committed by Ismael Juma
parent c96656efb3
commit 4aed28d189
38 changed files with 3681 additions and 122 deletions

View File

@ -47,6 +47,7 @@
<subpackage name="common"> <subpackage name="common">
<disallow pkg="org.apache.kafka.clients" /> <disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" /> <allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.internals" exact-match="true" />
<allow pkg="org.apache.kafka.test" /> <allow pkg="org.apache.kafka.test" />
<subpackage name="config"> <subpackage name="config">
@ -134,6 +135,10 @@
<allow pkg="org.apache.kafka.clients.consumer" /> <allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.clients.producer" /> <allow pkg="org.apache.kafka.clients.producer" />
</subpackage> </subpackage>
<subpackage name="admin">
<allow pkg="org.apache.kafka.clients.admin" />
</subpackage>
</subpackage> </subpackage>
<subpackage name="server"> <subpackage name="server">

View File

@ -8,7 +8,7 @@
<!-- Clients --> <!-- Clients -->
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest).java"/> files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/> files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
@ -35,7 +35,7 @@
files="DefaultRecordBatch.java"/> files="DefaultRecordBatch.java"/>
<suppress checks="ClassDataAbstractionCoupling" <suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager).java"/> files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
<suppress checks="ClassDataAbstractionCoupling" <suppress checks="ClassDataAbstractionCoupling"
files=".*/protocol/Errors.java"/> files=".*/protocol/Errors.java"/>

View File

@ -332,6 +332,7 @@ public final class Metadata {
Collection<PartitionInfo> partitionInfos = new ArrayList<>(); Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList(); List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.emptySet(); Set<String> internalTopics = Collections.emptySet();
Node controller = null;
String clusterId = null; String clusterId = null;
if (cluster != null) { if (cluster != null) {
clusterId = cluster.clusterResource().clusterId(); clusterId = cluster.clusterResource().clusterId();
@ -346,7 +347,8 @@ public final class Metadata {
} }
} }
nodes = cluster.nodes(); nodes = cluster.nodes();
controller = cluster.controller();
} }
return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics); return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
} }
} }

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
/**
* The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
* brokers, and configurations.
*
* @see KafkaAdminClient
*/
@InterfaceStability.Unstable
public abstract class AdminClient implements AutoCloseable {
/**
* Create a new AdminClient with the given configuration.
*
* @param conf The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Map<String, Object> conf) {
return KafkaAdminClient.create(new AdminClientConfig(conf));
}
/**
* Close the AdminClient and release all associated resources.
*/
public abstract void close();
/**
* Create a batch of new topics with the default options.
*
* @param newTopics The new topics to create.
* @return The CreateTopicsResults.
*/
public CreateTopicResults createTopics(Collection<NewTopic> newTopics) {
return createTopics(newTopics, new CreateTopicsOptions());
}
/**
* Create a batch of new topics.
*
* It may take several seconds after AdminClient#createTopics returns
* success for all the brokers to become aware that the topics have been created.
* During this time, AdminClient#listTopics and AdminClient#describeTopics
* may not return information about the new topics.
*
* @param newTopics The new topics to create.
* @param options The options to use when creating the new topics.
* @return The CreateTopicsResults.
*/
public abstract CreateTopicResults createTopics(Collection<NewTopic> newTopics,
CreateTopicsOptions options);
/**
* Similar to #{@link AdminClient#deleteTopics(Collection<String>, DeleteTopicsOptions),
* but uses the default options.
*
* @param topics The topic names to delete.
* @return The DeleteTopicsResults.
*/
public DeleteTopicResults deleteTopics(Collection<String> topics) {
return deleteTopics(topics, new DeleteTopicsOptions());
}
/**
* Delete a batch of topics.
*
* It may take several seconds after AdminClient#deleteTopics returns
* success for all the brokers to become aware that the topics are gone.
* During this time, AdminClient#listTopics and AdminClient#describeTopics
* may continue to return information about the deleted topics.
*
* If delete.topic.enable is false on the brokers, deleteTopics will mark
* the topics for deletion, but not actually delete them. The futures will
* return successfully in this case.
*
* @param topics The topic names to delete.
* @param options The options to use when deleting the topics.
* @return The DeleteTopicsResults.
*/
public abstract DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
/**
* List the topics available in the cluster with the default options.
*
* @return The ListTopicsResults.
*/
public ListTopicsResults listTopics() {
return listTopics(new ListTopicsOptions());
}
/**
* List the topics available in the cluster.
*
* @param options The options to use when listing the topics.
* @return The ListTopicsResults.
*/
public abstract ListTopicsResults listTopics(ListTopicsOptions options);
/**
* Descripe an individual topic in the cluster, with the default options.
*
* See {@link AdminClient#describeTopics(Collection<String>, DescribeTopicsOptions)}
*
* @param topicNames The names of the topics to describe.
*
* @return The DescribeTopicsResults.
*/
public DescribeTopicsResults describeTopics(Collection<String> topicNames) {
return describeTopics(topicNames, new DescribeTopicsOptions());
}
/**
* Descripe an individual topic in the cluster.
*
* Note that if auto.create.topics.enable is true on the brokers,
* AdminClient#describeTopic(topicName) may create a topic named topicName.
* There are two workarounds: either use AdminClient#listTopics and ensure
* that the topic is present before describing, or disable
* auto.create.topics.enable.
*
* @param topicNames The names of the topics to describe.
* @param options The options to use when describing the topic.
*
* @return The DescribeTopicsResults.
*/
public abstract DescribeTopicsResults describeTopics(Collection<String> topicNames,
DescribeTopicsOptions options);
/**
* Get information about the nodes in the cluster, using the default options.
*
* @return The DescribeClusterResults.
*/
public DescribeClusterResults describeCluster() {
return describeCluster(new DescribeClusterOptions());
}
/**
* Get information about the nodes in the cluster.
*
* @param options The options to use when getting information about the cluster.
* @return The DescribeClusterResults.
*/
public abstract DescribeClusterResults describeCluster(DescribeClusterOptions options);
/**
* Get information about the api versions of nodes in the cluster with the default options.
* See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
*
* @param nodes The nodes to get information about, or null to get information about all nodes.
* @return The ApiVersionsResults.
*/
public ApiVersionsResults apiVersions(Collection<Node> nodes) {
return apiVersions(nodes, new ApiVersionsOptions());
}
/**
* Get information about the api versions of nodes in the cluster.
*
* @param nodes The nodes to get information about, or null to get information about all nodes.
* @param options The options to use when getting api versions of the nodes.
* @return The ApiVersionsResults.
*/
public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* The AdminClient configuration keys
*/
public class AdminClientConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
/**
* <code>bootstrap.servers</code>
*/
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
/**
* <code>reconnect.backoff.ms</code>
*/
public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
/**
* <code>retry.backoff.ms</code>
*/
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
"retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
"some failure scenarios.";
/** <code>connections.max.idle.ms</code> */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
/** <code>request.timeout.ms</code> */
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
private static final String CLIENT_ID_DOC = CommonClientConfigs.CLIENT_ID_DOC;
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
.define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, atLeast(-1), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG,
Type.LONG,
50L,
atLeast(0L),
Importance.LOW,
RECONNECT_BACKOFF_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
atLeast(0L),
Importance.LOW,
RETRY_BACKOFF_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
120000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
5 * 60 * 1000,
Importance.MEDIUM,
CONNECTIONS_MAX_IDLE_MS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
.define(METRICS_RECORDING_LEVEL_CONFIG,
Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
Importance.LOW,
METRICS_RECORDING_LEVEL_DOC)
// security support
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
}
AdminClientConfig(Map<?, ?> props) {
super(CONFIG, props);
}
public static Set<String> configNames() {
return CONFIG.names();
}
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for the apiVersions call.
*/
@InterfaceStability.Unstable
public class ApiVersionsOptions {
private Integer timeoutMs = null;
public ApiVersionsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Results of the apiVersions call.
*/
@InterfaceStability.Unstable
public class ApiVersionsResults {
private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
ApiVersionsResults(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
this.futures = futures;
}
public Map<Node, KafkaFuture<NodeApiVersions>> results() {
return futures;
}
public KafkaFuture<Map<Node, NodeApiVersions>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
@Override
public Map<Node, NodeApiVersions> apply(Void v) {
Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
try {
versions.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures
// completed successfully.
throw new RuntimeException(e);
}
}
return versions;
}
});
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
* The result of newTopics.
*/
@InterfaceStability.Unstable
public class CreateTopicResults {
private final Map<String, KafkaFuture<Void>> futures;
CreateTopicResults(Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}
/**
* Return a map from topic names to futures, which can be used to check the status of individual
* topic creations.
*/
public Map<String, KafkaFuture<Void>> results() {
return futures;
}
/**
* Return a future which succeeds if all the topic creations succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for newTopics.
*/
@InterfaceStability.Unstable
public class CreateTopicsOptions {
private Integer timeoutMs = null;
private boolean validateOnly = false;
public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
public CreateTopicsOptions validateOnly(boolean validateOnly) {
this.validateOnly = validateOnly;
return this;
}
public boolean validateOnly() {
return validateOnly;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
* The result of the deleteTopics call.
*/
@InterfaceStability.Unstable
public class DeleteTopicResults {
final Map<String, KafkaFuture<Void>> futures;
DeleteTopicResults(Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}
/**
* Return a map from topic names to futures which can be used to check the status of
* individual deletions.
*/
public Map<String, KafkaFuture<Void>> results() {
return futures;
}
/**
* Return a future which succeeds only if all the topic deletions succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for deleteTopics.
*/
@InterfaceStability.Unstable
public class DeleteTopicsOptions {
private Integer timeoutMs = null;
public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for the describeCluster call.
*/
@InterfaceStability.Unstable
public class DescribeClusterOptions {
private Integer timeoutMs = null;
public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* The results of the describeCluster call.
*/
@InterfaceStability.Unstable
public class DescribeClusterResults {
private final KafkaFuture<Collection<Node>> future;
DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
this.future = future;
}
/**
* Returns a future which yields a collection of nodes.
*/
public KafkaFuture<Collection<Node>> nodes() {
return future;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for describeTopics.
*/
@InterfaceStability.Unstable
public class DescribeTopicsOptions {
private Integer timeoutMs = null;
public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* The results of the describeTopic call.
*/
@InterfaceStability.Unstable
public class DescribeTopicsResults {
private final Map<String, KafkaFuture<TopicDescription>> futures;
DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
this.futures = futures;
}
/**
* Return a map from topic names to futures which can be used to check the status of
* individual deletions.
*/
public Map<String, KafkaFuture<TopicDescription>> results() {
return futures;
}
/**
* Return a future which succeeds only if all the topic deletions succeed.
*/
public KafkaFuture<Map<String, TopicDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
@Override
public Map<String, TopicDescription> apply(Void v) {
Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
try {
descriptions.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures
// completed successfully.
throw new RuntimeException(e);
}
}
return descriptions;
}
});
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for listTopics.
*/
@InterfaceStability.Unstable
public class ListTopicsOptions {
private Integer timeoutMs = null;
private boolean listInternal = false;
public ListTopicsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
/**
* Set whether we should list internal topics.
*
* @param listInternal Whether we should list internal topics. null means to use
* the default.
* @return This ListTopicsOptions object.
*/
public ListTopicsOptions listInternal(boolean listInternal) {
this.listInternal = listInternal;
return this;
}
public boolean listInternal() {
return listInternal;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
/**
* The result of the listTopics call.
*/
@InterfaceStability.Unstable
public class ListTopicsResults {
final KafkaFuture<Map<String, TopicListing>> future;
ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) {
this.future = future;
}
/**
* Return a future which yields a map of topic names to TopicListing objects.
*/
public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
return future;
}
/**
* Return a future which yields a collection of TopicListing objects.
*/
public KafkaFuture<Collection<TopicListing>> descriptions() {
return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
@Override
public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
return namesToDescriptions.values();
}
});
}
/**
* Return a future which yields a collection of topic names.
*/
public KafkaFuture<Collection<String>> names() {
return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
@Override
public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
return namesToDescriptions.keySet();
}
});
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;
import java.util.List;
import java.util.Map;
/**
* A request to create a new topic through the AdminClient API.
*/
public class NewTopic {
private final String name;
private final int numPartitions;
private final short replicationFactor;
private final Map<Integer, List<Integer>> replicasAssignments;
private Map<String, String> configs = null;
/**
* Create a new topic with a fixed replication factor and number of partitions.
*/
public NewTopic(String name, int numPartitions, short replicationFactor) {
this.name = name;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.replicasAssignments = null;
}
/**
* A request to create a new topic with a specific replica assignment configuration.
*/
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
this.name = name;
this.numPartitions = -1;
this.replicationFactor = -1;
this.replicasAssignments = replicasAssignments;
}
public String name() {
return name;
}
/**
* Set the configuration to use on the new topic.
*
* @param configs The configuration map.
* @return This NewTopic object.
*/
public NewTopic configs(Map<String, String> configs) {
this.configs = configs;
return this;
}
TopicDetails convertToTopicDetails() {
if (replicasAssignments != null) {
if (configs != null) {
return new TopicDetails(replicasAssignments, configs);
} else {
return new TopicDetails(replicasAssignments);
}
} else {
if (configs != null) {
return new TopicDetails(numPartitions, replicationFactor, configs);
} else {
return new TopicDetails(numPartitions, replicationFactor);
}
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.utils.Utils;
import java.util.NavigableMap;
/**
* A detailed description of a single topic in the cluster.
*/
public class TopicDescription {
private final String name;
private final boolean internal;
private final NavigableMap<Integer, TopicPartitionInfo> partitions;
TopicDescription(String name, boolean internal,
NavigableMap<Integer, TopicPartitionInfo> partitions) {
this.name = name;
this.internal = internal;
this.partitions = partitions;
}
public String name() {
return name;
}
public boolean internal() {
return internal;
}
public NavigableMap<Integer, TopicPartitionInfo> partitions() {
return partitions;
}
@Override
public String toString() {
return "(name=" + name + ", internal=" + internal + ", partitions=" +
Utils.mkString(partitions, "[", "]", "=", ",") + ")";
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
/**
* A listing of a topic in the cluster.
*/
public class TopicListing {
private final String name;
private final boolean internal;
TopicListing(String name, boolean internal) {
this.name = name;
this.internal = internal;
}
public String name() {
return name;
}
public boolean internal() {
return internal;
}
@Override
public String toString() {
return "(name=" + name + ", internal=" + internal + ")";
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
public class TopicPartitionInfo {
private final int partition;
private final Node leader;
private final List<Node> replicas;
private final List<Node> isr;
TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.isr = isr;
}
public int partition() {
return partition;
}
public Node leader() {
return leader;
}
public List<Node> replicas() {
return replicas;
}
public List<Node> isr() {
return isr;
}
public String toString() {
return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
}
}

View File

@ -37,6 +37,7 @@ public final class Cluster {
private final List<Node> nodes; private final List<Node> nodes;
private final Set<String> unauthorizedTopics; private final Set<String> unauthorizedTopics;
private final Set<String> internalTopics; private final Set<String> internalTopics;
private final Node controller;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic; private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@ -54,7 +55,7 @@ public final class Cluster {
public Cluster(Collection<Node> nodes, public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions, Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) { Set<String> unauthorizedTopics) {
this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet()); this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), null);
} }
@ -68,7 +69,21 @@ public final class Cluster {
Collection<PartitionInfo> partitions, Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics, Set<String> unauthorizedTopics,
Set<String> internalTopics) { Set<String> internalTopics) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics); this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
}
/**
* Create a new cluster with the given id, nodes and partitions
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(String clusterId,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics,
Node controller) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
} }
private Cluster(String clusterId, private Cluster(String clusterId,
@ -76,7 +91,8 @@ public final class Cluster {
Collection<Node> nodes, Collection<Node> nodes,
Collection<PartitionInfo> partitions, Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics, Set<String> unauthorizedTopics,
Set<String> internalTopics) { Set<String> internalTopics,
Node controller) {
this.isBootstrapConfigured = isBootstrapConfigured; this.isBootstrapConfigured = isBootstrapConfigured;
this.clusterResource = new ClusterResource(clusterId); this.clusterResource = new ClusterResource(clusterId);
// make a randomized, unmodifiable copy of the nodes // make a randomized, unmodifiable copy of the nodes
@ -130,6 +146,7 @@ public final class Cluster {
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics); this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
this.internalTopics = Collections.unmodifiableSet(internalTopics); this.internalTopics = Collections.unmodifiableSet(internalTopics);
this.controller = controller;
} }
/** /**
@ -137,7 +154,7 @@ public final class Cluster {
*/ */
public static Cluster empty() { public static Cluster empty() {
return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
Collections.<String>emptySet()); Collections.<String>emptySet(), null);
} }
/** /**
@ -150,7 +167,7 @@ public final class Cluster {
int nodeId = -1; int nodeId = -1;
for (InetSocketAddress address : addresses) for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort())); nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet()); return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
} }
/** /**
@ -160,7 +177,7 @@ public final class Cluster {
Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition); Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
combinedPartitions.putAll(partitions); combinedPartitions.putAll(partitions);
return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(), return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics)); new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
} }
/** /**
@ -265,6 +282,10 @@ public final class Cluster {
return clusterResource; return clusterResource;
} }
public Node controller() {
return controller;
}
@Override @Override
public String toString() { public String toString() {
return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A flexible future which supports call chaining and other asynchronous programming patterns.
*/
public abstract class KafkaFuture<T> implements Future<T> {
/**
* A function which takes objects of type A and returns objects of type B.
*/
public static abstract class Function<A, B> {
public abstract B apply(A a);
}
/**
* A consumer of two different types of object.
*/
public static abstract class BiConsumer<A, B> {
public abstract void accept(A a, B b);
}
private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
private int remainingResponses;
private KafkaFuture future;
public AllOfAdapter(int remainingResponses, KafkaFuture future) {
this.remainingResponses = remainingResponses;
this.future = future;
}
@Override
public synchronized void accept(R newValue, Throwable exception) {
if (remainingResponses <= 0)
return;
if (exception != null) {
remainingResponses = 0;
future.completeExceptionally(exception);
} else {
remainingResponses--;
if (remainingResponses <= 0)
future.complete(null);
}
}
}
/**
* Returns a new KafkaFuture that is already completed with the given value.
*/
public static <U> KafkaFuture<U> completedFuture(U value) {
KafkaFuture<U> future = new KafkaFutureImpl<U>();
future.complete(value);
return future;
}
/**
* Returns a new KafkaFuture that is completed when all the given futures have completed. If
* any future throws an exception, the returned future returns it. If multiple futures throw
* an exception, which one gets returned is arbitrarily chosen.
*/
public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<Void>();
AllOfAdapter allOfWaiter = new AllOfAdapter(futures.length, allOfFuture);
for (KafkaFuture<?> future : futures) {
future.addWaiter(allOfWaiter);
}
return allOfFuture;
}
/**
* Returns a new KafkaFuture that, when this future completes normally, is executed with this
* futures's result as the argument to the supplied function.
*/
public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
/**
* If not already completed, sets the value returned by get() and related methods to the given
* value.
*/
protected abstract boolean complete(T newValue);
/**
* If not already completed, causes invocations of get() and related methods to throw the given
* exception.
*/
protected abstract boolean completeExceptionally(Throwable newException);
/**
* If not already completed, completes this future with a CancellationException. Dependent
* futures that have not already completed will also complete exceptionally, with a
* CompletionException caused by this CancellationException.
*/
@Override
public abstract boolean cancel(boolean mayInterruptIfRunning);
/**
* Waits if necessary for this future to complete, and then returns its result.
*/
@Override
public abstract T get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for this future to complete, and then returns
* its result, if available.
*/
@Override
public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException;
/**
* Returns the result value (or throws any encountered exception) if completed, else returns
* the given valueIfAbsent.
*/
public abstract T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException;
/**
* Returns true if this CompletableFuture was cancelled before it completed normally.
*/
@Override
public abstract boolean isCancelled();
/**
* Returns true if this CompletableFuture completed exceptionally, in any way.
*/
public abstract boolean isCompletedExceptionally();
/**
* Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
*/
@Override
public abstract boolean isDone();
}

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.KafkaFuture;
/**
* A flexible future which supports call chaining and other asynchronous programming patterns.
* This will eventually become a thin shim on top of Java 8's CompletableFuture.
*/
public class KafkaFutureImpl<T> extends KafkaFuture<T> {
/**
* A convenience method that throws the current exception, wrapping it if needed.
*
* In general, KafkaFuture throws CancellationException and InterruptedException directly, and
* wraps all other exceptions in an ExecutionException.
*/
private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException {
if (t instanceof CancellationException) {
throw (CancellationException) t;
} else if (t instanceof InterruptedException) {
throw (InterruptedException) t;
} else {
throw new ExecutionException(t);
}
}
private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
private final Function<A, B> function;
private final KafkaFutureImpl<B> future;
Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
this.function = function;
this.future = future;
}
@Override
public void accept(A a, Throwable exception) {
if (exception != null) {
future.completeExceptionally(exception);
} else {
try {
B b = function.apply(a);
future.complete(b);
} catch (Throwable t) {
future.completeExceptionally(t);
}
}
}
}
private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
private R value = null;
private Throwable exception = null;
private boolean done = false;
@Override
public synchronized void accept(R newValue, Throwable newException) {
this.value = newValue;
this.exception = newException;
this.done = true;
this.notifyAll();
}
synchronized R await() throws InterruptedException, ExecutionException {
while (true) {
if (exception != null)
wrapAndThrow(exception);
if (done)
return value;
this.wait();
}
}
R await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long startMs = System.currentTimeMillis();
long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1;
long delta = 0;
synchronized (this) {
while (true) {
if (exception != null)
wrapAndThrow(exception);
if (done)
return value;
if (delta > waitTimeMs) {
throw new TimeoutException();
}
this.wait(waitTimeMs - delta);
delta = System.currentTimeMillis() - startMs;
}
}
}
}
/**
* True if this future is done.
*/
private boolean done = false;
/**
* The value of this future, or null. Protected by the object monitor.
*/
private T value = null;
/**
* The exception associated with this future, or null. Protected by the object monitor.
*/
private Throwable exception = null;
/**
* A list of objects waiting for this future to complete (either successfully or
* exceptionally). Protected by the object monitor.
*/
private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>();
/**
* Returns a new KafkaFuture that, when this future completes normally, is executed with this
* futures's result as the argument to the supplied function.
*/
@Override
public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
addWaiter(new Applicant(function, future));
return future;
}
@Override
protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
if (exception != null) {
action.accept(null, exception);
} else if (done) {
action.accept(value, null);
} else {
waiters.add(action);
}
}
@Override
public synchronized boolean complete(T newValue) {
List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
synchronized (this) {
if (done)
return false;
value = newValue;
done = true;
oldWaiters = waiters;
waiters = null;
}
for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
waiter.accept(newValue, null);
}
return true;
}
@Override
public boolean completeExceptionally(Throwable newException) {
List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
synchronized (this) {
if (done)
return false;
exception = newException;
done = true;
oldWaiters = waiters;
waiters = null;
}
for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
waiter.accept(null, newException);
}
return true;
}
/**
* If not already completed, completes this future with a CancellationException. Dependent
* futures that have not already completed will also complete exceptionally, with a
* CompletionException caused by this CancellationException.
*/
@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
if (completeExceptionally(new CancellationException()))
return true;
return exception instanceof CancellationException;
}
/**
* Waits if necessary for this future to complete, and then returns its result.
*/
@Override
public T get() throws InterruptedException, ExecutionException {
SingleWaiter<T> waiter = new SingleWaiter<T>();
addWaiter(waiter);
return waiter.await();
}
/**
* Waits if necessary for at most the given time for this future to complete, and then returns
* its result, if available.
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
SingleWaiter<T> waiter = new SingleWaiter<T>();
addWaiter(waiter);
return waiter.await(timeout, unit);
}
/**
* Returns the result value (or throws any encountered exception) if completed, else returns
* the given valueIfAbsent.
*/
@Override
public synchronized T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException {
if (exception != null)
wrapAndThrow(exception);
if (done)
return value;
return valueIfAbsent;
}
/**
* Returns true if this CompletableFuture was cancelled before it completed normally.
*/
@Override
public synchronized boolean isCancelled() {
return (exception != null) && (exception instanceof CancellationException);
}
/**
* Returns true if this CompletableFuture completed exceptionally, in any way.
*/
@Override
public synchronized boolean isCompletedExceptionally() {
return exception != null;
}
/**
* Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
*/
@Override
public synchronized boolean isDone() {
return done;
}
}

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.KafkaException;
/** /**
* A ChannelBuilder interface to build Channel based on configs * A ChannelBuilder interface to build Channel based on configs
*/ */
public interface ChannelBuilder { public interface ChannelBuilder extends AutoCloseable {
/** /**
* Configure this class with the given key-value pairs * Configure this class with the given key-value pairs

View File

@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory;
* *
* This class is not thread safe! * This class is not thread safe!
*/ */
public class Selector implements Selectable { public class Selector implements Selectable, AutoCloseable {
public static final long NO_IDLE_TIMEOUT_MS = -1; public static final long NO_IDLE_TIMEOUT_MS = -1;
private static final Logger log = LoggerFactory.getLogger(Selector.class); private static final Logger log = LoggerFactory.getLogger(Selector.class);

View File

@ -83,114 +83,388 @@ import java.util.Map;
* Do not add exceptions that occur only on the client or only on the server here. * Do not add exceptions that occur only on the client or only on the server here.
*/ */
public enum Errors { public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), UNKNOWN(-1, "The server experienced an unexpected error when processing the request",
NONE(0, null), new ApiExceptionBuilder() {
OFFSET_OUT_OF_RANGE(1, @Override
new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), public ApiException build(String message) {
CORRUPT_MESSAGE(2, return new UnknownServerException(message);
new CorruptRecordException("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.")), }
UNKNOWN_TOPIC_OR_PARTITION(3, }),
new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), NONE(0, null,
INVALID_FETCH_SIZE(4, new ApiExceptionBuilder() {
new InvalidFetchSizeException("The requested fetch size is invalid.")), @Override
LEADER_NOT_AVAILABLE(5, public ApiException build(String message) {
new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), return null;
NOT_LEADER_FOR_PARTITION(6, }
new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), }),
REQUEST_TIMED_OUT(7, OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
new TimeoutException("The request timed out.")), new ApiExceptionBuilder() {
BROKER_NOT_AVAILABLE(8, @Override
new BrokerNotAvailableException("The broker is not available.")), public ApiException build(String message) {
REPLICA_NOT_AVAILABLE(9, return new OffsetOutOfRangeException(message);
new ReplicaNotAvailableException("The replica is not available for the requested topic-partition")), }
MESSAGE_TOO_LARGE(10, }),
new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.",
STALE_CONTROLLER_EPOCH(11, new ApiExceptionBuilder() {
new ControllerMovedException("The controller moved to another broker.")), @Override
OFFSET_METADATA_TOO_LARGE(12, public ApiException build(String message) {
new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), return new CorruptRecordException(message);
NETWORK_EXCEPTION(13, }
new NetworkException("The server disconnected before a response was received.")), }),
COORDINATOR_LOAD_IN_PROGRESS(14, UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
new CoordinatorLoadInProgressException("The coordinator is loading and hence can't process requests.")), new ApiExceptionBuilder() {
COORDINATOR_NOT_AVAILABLE(15, @Override
new CoordinatorNotAvailableException("The coordinator is not available.")), public ApiException build(String message) {
NOT_COORDINATOR(16, return new UnknownTopicOrPartitionException(message);
new NotCoordinatorException("This is not the correct coordinator.")), }
INVALID_TOPIC_EXCEPTION(17, }),
new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
RECORD_LIST_TOO_LARGE(18, new ApiExceptionBuilder() {
new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), @Override
NOT_ENOUGH_REPLICAS(19, public ApiException build(String message) {
new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), return new InvalidFetchSizeException(message);
NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, }
new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")), }),
INVALID_REQUIRED_ACKS(21, LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")), new ApiExceptionBuilder() {
ILLEGAL_GENERATION(22, @Override
new IllegalGenerationException("Specified group generation id is not valid.")), public ApiException build(String message) {
return new LeaderNotAvailableException(message);
}
}),
NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new NotLeaderForPartitionException(message);
}
}),
REQUEST_TIMED_OUT(7, "The request timed out.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new TimeoutException(message);
}
}),
BROKER_NOT_AVAILABLE(8, "The broker is not available.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new BrokerNotAvailableException(message);
}
}),
REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new ReplicaNotAvailableException(message);
}
}),
MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new RecordTooLargeException(message);
}
}),
STALE_CONTROLLER_EPOCH(11, "The controller moved to another broker.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new ControllerMovedException(message);
}
}),
OFFSET_METADATA_TOO_LARGE(12, "The metadata field of the offset request was too large.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new OffsetMetadataTooLarge(message);
}
}),
NETWORK_EXCEPTION(13, "The server disconnected before a response was received.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new NetworkException(message);
}
}),
COORDINATOR_LOAD_IN_PROGRESS(14, "The coordinator is loading and hence can't process requests.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new CoordinatorLoadInProgressException(message);
}
}),
COORDINATOR_NOT_AVAILABLE(15, "The coordinator is not available.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new CoordinatorNotAvailableException(message);
}
}),
NOT_COORDINATOR(16, "This is not the correct coordinator.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new NotCoordinatorException(message);
}
}),
INVALID_TOPIC_EXCEPTION(17, "The request attempted to perform an operation on an invalid topic.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidTopicException(message);
}
}),
RECORD_LIST_TOO_LARGE(18, "The request included message batch larger than the configured segment size on the server.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new RecordBatchTooLargeException(message);
}
}),
NOT_ENOUGH_REPLICAS(19, "Messages are rejected since there are fewer in-sync replicas than required.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new NotEnoughReplicasException(message);
}
}),
NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, "Messages are written to the log, but to fewer in-sync replicas than required.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new NotEnoughReplicasAfterAppendException(message);
}
}),
INVALID_REQUIRED_ACKS(21, "Produce request specified an invalid value for required acks.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidRequiredAcksException(message);
}
}),
ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new IllegalGenerationException(message);
}
}),
INCONSISTENT_GROUP_PROTOCOL(23, INCONSISTENT_GROUP_PROTOCOL(23,
new InconsistentGroupProtocolException("The group member's supported protocols are incompatible with those of existing members.")), "The group member's supported protocols are incompatible with those of existing members.",
INVALID_GROUP_ID(24, new ApiExceptionBuilder() {
new InvalidGroupIdException("The configured groupId is invalid")), @Override
UNKNOWN_MEMBER_ID(25, public ApiException build(String message) {
new UnknownMemberIdException("The coordinator is not aware of this member.")), return new InconsistentGroupProtocolException(message);
}
}),
INVALID_GROUP_ID(24, "The configured groupId is invalid",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidGroupIdException(message);
}
}),
UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new UnknownMemberIdException(message);
}
}),
INVALID_SESSION_TIMEOUT(26, INVALID_SESSION_TIMEOUT(26,
new InvalidSessionTimeoutException("The session timeout is not within the range allowed by the broker " + "The session timeout is not within the range allowed by the broker " +
"(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).")), "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).",
REBALANCE_IN_PROGRESS(27, new ApiExceptionBuilder() {
new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")), @Override
INVALID_COMMIT_OFFSET_SIZE(28, public ApiException build(String message) {
new InvalidCommitOffsetSizeException("The committing offset data size is not valid")), return new InvalidSessionTimeoutException(message);
TOPIC_AUTHORIZATION_FAILED(29, }
new TopicAuthorizationException("Topic authorization failed.")), }),
GROUP_AUTHORIZATION_FAILED(30, REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
new GroupAuthorizationException("Group authorization failed.")), new ApiExceptionBuilder() {
CLUSTER_AUTHORIZATION_FAILED(31, @Override
new ClusterAuthorizationException("Cluster authorization failed.")), public ApiException build(String message) {
INVALID_TIMESTAMP(32, return new RebalanceInProgressException(message);
new InvalidTimestampException("The timestamp of the message is out of acceptable range.")), }
UNSUPPORTED_SASL_MECHANISM(33, }),
new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")), INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid",
ILLEGAL_SASL_STATE(34, new ApiExceptionBuilder() {
new IllegalSaslStateException("Request is not valid given the current SASL state.")), @Override
UNSUPPORTED_VERSION(35, public ApiException build(String message) {
new UnsupportedVersionException("The version of API is not supported.")), return new InvalidCommitOffsetSizeException(message);
TOPIC_ALREADY_EXISTS(36, }
new TopicExistsException("Topic with this name already exists.")), }),
INVALID_PARTITIONS(37, TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
new InvalidPartitionsException("Number of partitions is invalid.")), new ApiExceptionBuilder() {
INVALID_REPLICATION_FACTOR(38, @Override
new InvalidReplicationFactorException("Replication-factor is invalid.")), public ApiException build(String message) {
INVALID_REPLICA_ASSIGNMENT(39, return new TopicAuthorizationException(message);
new InvalidReplicaAssignmentException("Replica assignment is invalid.")), }
INVALID_CONFIG(40, }),
new InvalidConfigurationException("Configuration is invalid.")), GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.",
NOT_CONTROLLER(41, new ApiExceptionBuilder() {
new NotControllerException("This is not the correct controller for this cluster.")), @Override
INVALID_REQUEST(42, public ApiException build(String message) {
new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" + return new GroupAuthorizationException(message);
" the message was sent to an incompatible broker. See the broker logs for more details.")), }
UNSUPPORTED_FOR_MESSAGE_FORMAT(43, }),
new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")), CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.",
POLICY_VIOLATION(44, new ApiExceptionBuilder() {
new PolicyViolationException("Request parameters do not satisfy the configured policy.")), @Override
OUT_OF_ORDER_SEQUENCE_NUMBER(45, public ApiException build(String message) {
new OutOfOrderSequenceException("The broker received an out of order sequence number")), return new ClusterAuthorizationException(message);
DUPLICATE_SEQUENCE_NUMBER(46, }
new DuplicateSequenceNumberException("The broker received a duplicate sequence number")), }),
INVALID_PRODUCER_EPOCH(47, INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.",
new ProducerFencedException("Producer attempted an operation with an old epoch")), new ApiExceptionBuilder() {
INVALID_TXN_STATE(48, @Override
new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")), public ApiException build(String message) {
INVALID_PID_MAPPING(49, return new InvalidTimestampException(message);
new InvalidPidMappingException("The PID mapping is invalid")), }
INVALID_TRANSACTION_TIMEOUT(50, }),
new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " + UNSUPPORTED_SASL_MECHANISM(33, "The broker does not support the requested SASL mechanism.",
"(as configured by max.transaction.timeout.ms).")), new ApiExceptionBuilder() {
CONCURRENT_TRANSACTIONS(51, @Override
new ConcurrentTransactionsException("The producer attempted to update a transaction " + public ApiException build(String message) {
"while another concurrent operation on the same transaction was ongoing")); return new UnsupportedSaslMechanismException(message);
}
}),
ILLEGAL_SASL_STATE(34, "Request is not valid given the current SASL state.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new IllegalSaslStateException(message);
}
}),
UNSUPPORTED_VERSION(35, "The version of API is not supported.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new UnsupportedVersionException(message);
}
}),
TOPIC_ALREADY_EXISTS(36, "Topic with this name already exists.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new TopicExistsException(message);
}
}),
INVALID_PARTITIONS(37, "Number of partitions is invalid.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidPartitionsException(message);
}
}),
INVALID_REPLICATION_FACTOR(38, "Replication-factor is invalid.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidReplicationFactorException(message);
}
}),
INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidReplicaAssignmentException(message);
}
}),
INVALID_CONFIG(40, "Configuration is invalid.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidConfigurationException(message);
}
}),
NOT_CONTROLLER(41, "This is not the correct controller for this cluster.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new NotControllerException(message);
}
}),
INVALID_REQUEST(42, "This most likely occurs because of a request being malformed by the " +
"client library or the message was sent to an incompatible broker. See the broker logs " +
"for more details.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidRequestException(message);
}
}),
UNSUPPORTED_FOR_MESSAGE_FORMAT(43, "The message format version on the broker does not support the request.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new UnsupportedForMessageFormatException(message);
}
}),
POLICY_VIOLATION(44, "Request parameters do not satisfy the configured policy.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new PolicyViolationException(message);
}
}),
OUT_OF_ORDER_SEQUENCE_NUMBER(45, "The broker received an out of order sequence number",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new OutOfOrderSequenceException(message);
}
}),
DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence number",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new DuplicateSequenceNumberException(message);
}
}),
INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new ProducerFencedException(message);
}
}),
INVALID_TXN_STATE(48, "The producer attempted a transactional operation in an invalid state",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidTxnStateException(message);
}
}),
INVALID_PID_MAPPING(49, "The PID mapping is invalid",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidPidMappingException(message);
}
}),
INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than the maximum value allowed by " +
"the broker (as configured by max.transaction.timeout.ms).",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidTxnTimeoutException(message);
}
}),
CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a transaction " +
"while another concurrent operation on the same transaction was ongoing",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new ConcurrentTransactionsException(message);
}
});
private interface ApiExceptionBuilder {
ApiException build(String message);
}
private static final Logger log = LoggerFactory.getLogger(Errors.class); private static final Logger log = LoggerFactory.getLogger(Errors.class);
@ -206,11 +480,13 @@ public enum Errors {
} }
private final short code; private final short code;
private final ApiExceptionBuilder builder;
private final ApiException exception; private final ApiException exception;
Errors(int code, ApiException exception) { Errors(int code, String defaultExceptionString, ApiExceptionBuilder builder) {
this.code = (short) code; this.code = (short) code;
this.exception = exception; this.builder = builder;
this.exception = builder.build(defaultExceptionString);
} }
/** /**
@ -220,6 +496,21 @@ public enum Errors {
return this.exception; return this.exception;
} }
/**
* Create an instance of the ApiException that contains the given error message.
*
* @param message The message string to set.
* @return The exception.
*/
public ApiException exception(String message) {
if (message == null) {
// If no error message was specified, return an exception with the default error message.
return exception;
}
// Return an exception with the given error message.
return builder.build(message);
}
/** /**
* Returns the class name of the exception or null if this is {@code Errors.NONE}. * Returns the class name of the exception or null if this is {@code Errors.NONE}.
*/ */

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
@ -61,6 +62,10 @@ public class CreateTopicsResponse extends AbstractResponse {
return message; return message;
} }
public ApiException exception() {
return error.exception(message);
}
@Override @Override
public String toString() { public String toString() {
return "Error(error=" + error + ", message=" + message + ")"; return "Error(error=" + error + ", message=" + message + ")";

View File

@ -260,7 +260,8 @@ public class MetadataResponse extends AbstractResponse {
} }
} }
return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics); return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
internalTopics, this.controller);
} }
/** /**

View File

@ -659,7 +659,7 @@ public class Utils {
/** /**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
*/ */
public static void closeQuietly(Closeable closeable, String name) { public static void closeQuietly(AutoCloseable closeable, String name) {
if (closeable != null) { if (closeable != null) {
try { try {
closeable.close(); closeable.close();

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.utils.Time;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* A unit test for KafkaAdminClient.
*
* See for an integration test of the KafkaAdminClient.
* Also see KafkaAdminClientIntegrationTest for a unit test of the admin client.
*/
public class KafkaAdminClientTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testGetOrCreateListValue() {
Map<String, List<String>> map = new HashMap<>();
List<String> fooList = KafkaAdminClient.getOrCreateListValue(map, "foo");
assertNotNull(fooList);
fooList.add("a");
fooList.add("b");
List<String> fooList2 = KafkaAdminClient.getOrCreateListValue(map, "foo");
assertEquals(fooList, fooList2);
assertTrue(fooList2.contains("a"));
assertTrue(fooList2.contains("b"));
List<String> barList = KafkaAdminClient.getOrCreateListValue(map, "bar");
assertNotNull(barList);
assertTrue(barList.isEmpty());
}
@Test
public void testCalcTimeoutMsRemainingAsInt() {
assertEquals(0, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1000));
assertEquals(100, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1100));
assertEquals(Integer.MAX_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(0, Long.MAX_VALUE));
assertEquals(Integer.MIN_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(Long.MAX_VALUE, 0));
}
@Test
public void testPrettyPrintException() {
assertEquals("Null exception.", KafkaAdminClient.prettyPrintException(null));
assertEquals("TimeoutException", KafkaAdminClient.prettyPrintException(new TimeoutException()));
assertEquals("TimeoutException: The foobar timed out.",
KafkaAdminClient.prettyPrintException(new TimeoutException("The foobar timed out.")));
}
private static Map<String, Object> newStrMap(String... vals) {
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
if (vals.length % 2 != 0) {
throw new IllegalStateException();
}
for (int i = 0; i < vals.length; i += 2) {
map.put(vals[i], vals[i + 1]);
}
return map;
}
private static AdminClientConfig newConfMap(String... vals) {
return new AdminClientConfig(newStrMap(vals));
}
@Test
public void testGenerateClientId() {
Set<String> ids = new HashSet<>();
for (int i = 0; i < 10; i++) {
String id = KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, ""));
assertTrue("Got duplicate id " + id, !ids.contains(id));
ids.add(id);
}
assertEquals("myCustomId",
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
}
private static class MockKafkaAdminClientContext implements AutoCloseable {
final static String CLUSTER_ID = "mockClusterId";
final AdminClientConfig adminClientConfig;
final Metadata metadata;
final HashMap<Integer, Node> nodes;
final MockClient mockClient;
final AdminClient client;
Cluster cluster;
MockKafkaAdminClientContext(Map<String, Object> config) {
this.adminClientConfig = new AdminClientConfig(config);
this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
this.nodes = new HashMap<Integer, Node>();
this.nodes.put(0, new Node(0, "localhost", 8121));
this.nodes.put(1, new Node(1, "localhost", 8122));
this.nodes.put(2, new Node(2, "localhost", 8123));
this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
this.cluster = new Cluster(CLUSTER_ID, nodes.values(),
Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
}
@Override
public void close() {
this.client.close();
}
}
@Test
public void testCloseAdminClient() throws Exception {
try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
}
}
private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
throws InterruptedException {
try {
future.get();
fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
cause.getClass().getSimpleName(),
exceptionClass, cause.getClass());
}
}
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@Test
public void testTimeoutWithoutMetadata() throws Exception {
try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
ctx.mockClient.setNode(new Node(0, "localhost", 8121));
ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
put("myTopic", new Error(Errors.NONE, ""));
}}));
KafkaFuture<Void> future = ctx.client.
createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
}})), new CreateTopicsOptions().timeoutMs(1000)).all();
assertFutureError(future, TimeoutException.class);
}
}
@Test
public void testCreateTopics() throws Exception {
try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
ctx.mockClient.setNode(ctx.nodes.get(0));
ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
put("myTopic", new Error(Errors.NONE, ""));
}}));
KafkaFuture<Void> future = ctx.client.
createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
}})), new CreateTopicsOptions().timeoutMs(10000)).all();
future.get();
}
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
/**
* A unit test for KafkaFuture.
*/
public class KafkaFutureTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testCompleteFutures() throws Exception {
KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
assertTrue(future123.complete(123));
assertEquals(Integer.valueOf(123), future123.get());
assertFalse(future123.complete(456));
assertTrue(future123.isDone());
assertFalse(future123.isCancelled());
assertFalse(future123.isCompletedExceptionally());
KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
assertEquals(Integer.valueOf(456), future456.get());
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
futureFail.completeExceptionally(new RuntimeException("We require more vespene gas"));
try {
futureFail.get();
Assert.fail("Expected an exception");
} catch (ExecutionException e) {
assertEquals(RuntimeException.class, e.getCause().getClass());
Assert.assertEquals("We require more vespene gas", e.getCause().getMessage());
}
}
@Test
public void testCompletingFutures() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread myThread = new CompleterThread(future, "You must construct additional pylons.");
assertFalse(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
assertEquals("I am ready", future.getNow("I am ready"));
myThread.start();
String str = future.get(5, TimeUnit.MINUTES);
assertEquals("You must construct additional pylons.", str);
assertEquals("You must construct additional pylons.", future.getNow("I am ready"));
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
myThread.join();
assertEquals(null, myThread.testException);
}
private static class CompleterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T value;
Throwable testException = null;
CompleterThread(KafkaFutureImpl<T> future, T value) {
this.future = future;
this.value = value;
}
@Override
public void run() {
try {
try {
Thread.sleep(0, 200);
} catch (InterruptedException e) {
}
future.complete(value);
} catch (Throwable testException) {
this.testException = testException;
}
}
}
private static class WaiterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T expected;
Throwable testException = null;
WaiterThread(KafkaFutureImpl<T> future, T expected) {
this.future = future;
this.expected = expected;
}
@Override
public void run() {
try {
T value = future.get();
assertEquals(expected, value);
} catch (Throwable testException) {
this.testException = testException;
}
}
}
@Test
public void testAllOfFutures() throws Exception {
final int numThreads = 5;
final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(new KafkaFutureImpl<Integer>());
}
KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
final List<CompleterThread> completerThreads = new ArrayList<>();
final List<WaiterThread> waiterThreads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
completerThreads.add(new CompleterThread<>(futures.get(i), i));
waiterThreads.add(new WaiterThread<>(futures.get(i), i));
}
assertFalse(allFuture.isDone());
for (int i = 0; i < numThreads; i++) {
waiterThreads.get(i).start();
}
for (int i = 0; i < numThreads - 1; i++) {
completerThreads.get(i).start();
}
assertFalse(allFuture.isDone());
completerThreads.get(numThreads - 1).start();
allFuture.get();
assertTrue(allFuture.isDone());
for (int i = 0; i < numThreads; i++) {
assertEquals(Integer.valueOf(i), futures.get(i).get());
}
for (int i = 0; i < numThreads; i++) {
completerThreads.get(i).join();
waiterThreads.get(i).join();
assertEquals(null, completerThreads.get(i).testException);
assertEquals(null, waiterThreads.get(i).testException);
}
}
}

View File

@ -40,6 +40,11 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try import scala.util.Try
/**
* A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
* and configurations. This client is deprecated, and will be replaced by KafkaAdminClient.
* @see KafkaAdminClient
*/
class AdminClient(val time: Time, class AdminClient(val time: Time,
val requestTimeoutMs: Int, val requestTimeoutMs: Int,
val retryBackoffMs: Long, val retryBackoffMs: Long,

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.util
import java.util.Properties
import java.util.concurrent.ExecutionException
import org.apache.kafka.common.utils.Utils
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Rule, Test}
import org.junit.rules.Timeout
import org.junit.Assert._
import scala.collection.JavaConverters._
/**
* An integration test of the KafkaAdminClient.
*
* Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
*/
class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
@Rule
def globalTimeout = Timeout.millis(120000)
var client: AdminClient = null
@After
def closeClient(): Unit = {
if (client != null)
Utils.closeQuietly(client, "AdminClient")
}
val brokerCount = 3
lazy val serverConfig = new Properties
def createConfig(): util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
config
}
def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
TestUtils.waitUntilTrue(() => {
val topics = client.listTopics().names().get()
expectedPresent.forall(topicName => topics.contains(topicName)) &&
expectedMissing.forall(topicName => !topics.contains(topicName))
}, "timed out waiting for topics")
}
def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
try {
future.get()
fail("Expected CompletableFuture.get to return an exception")
} catch {
case e: ExecutionException =>
val cause = e.getCause()
assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
cause.getClass().getName, clazz.isInstance(cause))
}
}
@Test
def testClose(): Unit = {
val client = AdminClient.create(createConfig())
client.close()
client.close() // double close has no effect
}
@Test
def testListNodes(): Unit = {
client = AdminClient.create(createConfig())
val brokerStrs = brokerList.split(",").toList.sorted
var nodeStrs : List[String] = null
do {
var nodes = client.describeCluster().nodes().get().asScala
nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
} while (nodeStrs.size < brokerStrs.size)
assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
client.close()
}
@Test
def testCreateDeleteTopics(): Unit = {
client = AdminClient.create(createConfig())
val newTopics : List[NewTopic] = List(
new NewTopic("mytopic", 1, 1),
new NewTopic("mytopic2", 1, 1))
client.createTopics(newTopics.asJava,
new CreateTopicsOptions().validateOnly(true)).all().get()
waitForTopics(client, List(), List("mytopic", "mytopic2"))
client.createTopics(newTopics.asJava).all().get()
waitForTopics(client, List("mytopic", "mytopic2"), List())
val results = client.createTopics(newTopics.asJava).results()
assert(results.containsKey("mytopic"))
assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
assert(results.containsKey("mytopic2"))
assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
val deleteTopics : Set[String] = Set("mytopic", "mytopic2")
client.deleteTopics(deleteTopics.asJava).all().get()
waitForTopics(client, List(), List("mytopic", "mytopic2"))
client.close()
}
@Test
def testGetAllBrokerVersions(): Unit = {
client = AdminClient.create(createConfig())
val nodes = client.describeCluster().nodes().get()
val nodesToVersions = client.apiVersions(nodes).all().get()
val brokers = brokerList.split(",")
assert(brokers.size == nodesToVersions.size())
for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
val hostStr = s"${node.host}:${node.port}"
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
}
client.close()
}
override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach { config =>
config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true");
}
cfgs.foreach(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
}
}

View File

@ -33,7 +33,10 @@ import org.junit.{After, Before, Test}
import org.junit.Assert._ import org.junit.Assert._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class AdminClientTest extends IntegrationTestHarness with Logging { /**
* Tests for the deprecated Scala AdminClient.
*/
class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
val producerCount = 1 val producerCount = 1
val consumerCount = 2 val consumerCount = 2

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.api
import java.io.File
import org.apache.kafka.common.protocol.SecurityProtocol
import kafka.server.KafkaConfig
class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslTestHarness {
override protected val zkSaslEnabled = true
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
}

View File

@ -561,6 +561,9 @@ object TestUtils extends Logging {
def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties = def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties) securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties)
def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", saslProperties)
/** /**
* Create a new consumer with a few pre-configured properties. * Create a new consumer with a few pre-configured properties.
*/ */